以分布式模式启动Kafka连接时请求超时消息 [英] Request timed out Message while starting Kafka connect in distributed mode
问题描述
我已经下载了 confluent 5.4 版.
而且我的连接器在独立模式下运行良好.
wget/usr/local http://packages.confluent.io/archive/5.4/confluent-5.4.0-2.11.tar.gz -P ~/Downloads/tar -zxvf ~/Downloads/confluent-5.4.0-2.11.tar.gz -C ~/Downloads/须藤 mv ~/Downloads/confluent-5.4.0/usr/local/
然后我试图在分布式模式下运行 Kafak connect 所以我修改了我的 connect-distributed.properties 如下
bootstrap.servers=b-***.eu-west-1.amazonaws.com:9092,b-***.eu-west-1.amazonaws.com:9092,b-***.eu-west-1.amazonaws.com:9092group.id=连接集群key.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.storage.StringConverteroffset.storage.topic=连接偏移量offset.storage.replication.factor=1config.storage.topic=connect-configsconfig.storage.replication.factor=1status.storage.topic=连接状态status.storage.replication.factor=1offset.flush.interval.ms=10000plugin.path=/usr/local/confluent/share/java
然后我像下面一样启动我的连接器
/usr/local/confluent/bin/connect-distributed/usr/local/confluent/etc/kafka/connect-distributed.properties
好像启动成功了 [2020-02-02 05:22:33,860] INFO 加入群得到任务:
Assignment{error=0, leader='connect-1-c99d50a9-faf0-4b15-8a3d-3add55b7e206', leaderUrl='http://10.97.49.217:8083/', offset=-1,connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1216)[2020-02-02 05:22:33,861] 信息使用配置偏移 -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:850) 启动连接器和任务[2020-02-02 05:22:33,861] INFO 完成启动连接器和任务 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860)
最后我使用 curl 命令来创建我的接收器连接器.
curl -X POST -H "Content-Type: application/json" --data '{"name":"elastic-search-sink-audit","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"2","topics":"fsp-AUDIT_EVENT_DEMO","key.ignore":"true","connection.url":"https://**.amazonaws.com","type.name":"kafka-connect","name":"elastic-search-sink-audit","errors.tolerance":"all","errors.deadletterqueue.topic.name":"fsp-dlq-audit-event"}}' http://localhost:8083/connectors |知乎
和EC2 IP地址
curl -X POST -H "Content-Type: application/json" --data '{"name":"elastic-search-sink-audit-distributed","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"2","topics":"audit_event_distributed","key.ignore":"true","connection.url":"https://***.us-east-1.es.amazonaws.com","type.name":"kafka-connect","name":"elastic-search-sink-audit-distributed","errors.tolerance":"all","errors.deadletterqueue.topic.name":"dlq_distributed"}}' http://10.67.39.217:8083/connectors |知乎
1 分钟后运行此命令后,我得到以下响应
% Total % Received % Xferd 平均速度时间时间时间电流下载上传总花费的剩余速度100 498 0 48 100 450 0 4 0:01:52 0:01:30 0:00:22 10{错误代码":500,"message": "请求超时"}
然后我也继续得到这个,但这只是警告,因为我的主题中有足够的副本
NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:598)^C[2020-02-03 08:05:54,890] 警告 [Producer clientId=producer-3] 在 topic-partition connect-configs-0 上得到相关 ID 为 7185 的错误产生响应,正在重试(剩余 2147476495 次尝试).错误:NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:598)^C[2020-02-03 08:05:54,991] 警告 [Producer clientId=producer-3] 在 topic-partition connect-configs-0 上得到了相关 ID 为 7186 的错误产生响应,正在重试(剩余 2147476494 次尝试).错误:NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:598)
当我描述我的主题时
主题:fsp-AUDIT_EVENT_DEMO PartitionCount:1 ReplicationFactor:3 配置:主题:fsp-AUDIT_EVENT_DEMO 分区:0 领导者:1 副本:1,6,2 Isr:1,6,2
所以我的主题有足够的副本.
更新
描述
的结果Topic:connect-configs PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact主题:connect-configs 分区:0 Leader:6 Replicas:6 Isr:6主题:连接状态 PartitionCount:5 ReplicationFactor:1 Configs:cleanup.policy=compact主题:connect-status 分区:0 Leader:6 Replicas:6 Isr:6主题:connect-status 分区:1 Leader:1 Replicas:1 Isr:1主题:connect-status 分区:2 Leader:2 Replicas:2 Isr:2主题:connect-status 分区:3 Leader:4 Replicas:4 Isr:4主题:connect-status 分区:4 Leader:5 Replicas:5 Isr:5主题:connect-offsets PartitionCount:25 ReplicationFactor:1 Configs:cleanup.policy=compact
请帮我解决这个问题.
这三个主题都是由 kafka connect connect-offsets connect-configs connect-status 创建的
他们确实是
offset.storage.topic=connect-offsetsoffset.storage.replication.factor=1config.storage.topic=connect-configsconfig.storage.replication.factor=1status.storage.topic=连接状态status.storage.replication.factor=1
<块引用>
你建议我改变它吗?
不是名称,而是复制因子,是的.1个副本意味着如果任何一个broker宕机了,那么topic基本下线
<块引用>还有你在哪里寻找经纪人ID?
来自主题描述的第 3-5 列
I have downloaded confluent version 5.4 .
And i have connector running good in stand alone mode .
wget /usr/local http://packages.confluent.io/archive/5.4/confluent-5.4.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.4.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.4.0 /usr/local/
and then i am trying to run Kafak connect on distributed mode so i modified my connect-distributed.properties like below
bootstrap.servers=b-***.eu-west-1.amazonaws.com:9092,b-***.eu-west-1.amazonaws.com:9092,b-***.eu-west-1.amazonaws.com:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java
and then i start my connector like below
/usr/local/confluent/bin/connect-distributed /usr/local/confluent/etc/kafka/connect-distributed.properties
it looks like it started successfully [2020-02-02 05:22:33,860] INFO Joined group and got assignment:
Assignment{error=0, leader='connect-1-c99d50a9-faf0-4b15-8a3d-3add55b7e206', leaderUrl='http://10.97.49.217:8083/', offset=-1, connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1216)
[2020-02-02 05:22:33,861] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:850)
[2020-02-02 05:22:33,861] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860)
Finally i am using curl command to create my sink connector .
curl -X POST -H "Content-Type: application/json" --data '{"name":"elastic-search-sink-audit","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"2","topics":"fsp-AUDIT_EVENT_DEMO","key.ignore":"true","connection.url":"https://**.amazonaws.com","type.name":"kafka-connect","name":"elastic-search-sink-audit","errors.tolerance":"all","errors.deadletterqueue.topic.name":"fsp-dlq-audit-event"}}' http://localhost:8083/connectors | jq
and with EC2 IP address
curl -X POST -H "Content-Type: application/json" --data '{"name":"elastic-search-sink-audit-distributed","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"2","topics":"audit_event_distributed","key.ignore":"true","connection.url":"https://***.us-east-1.es.amazonaws.com","type.name":"kafka-connect","name":"elastic-search-sink-audit-distributed","errors.tolerance":"all","errors.deadletterqueue.topic.name":"dlq_distributed"}}' http://10.67.39.217:8083/connectors | jq
After running this command after 1 minute i get below response
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 498 0 48 100 450 0 4 0:01:52 0:01:30 0:00:22 10
{
"error_code": 500,
"message": "Request timed out"
}
And then in i keep on getting this as well but this is just warning as i have enough replicas in my topic
NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:598)
^C[2020-02-03 08:05:54,890] WARN [Producer clientId=producer-3] Got error produce response with correlation id 7185 on topic-partition connect-configs-0, retrying (2147476495 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:598)
^C[2020-02-03 08:05:54,991] WARN [Producer clientId=producer-3] Got error produce response with correlation id 7186 on topic-partition connect-configs-0, retrying (2147476494 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:598)
When i described my topic
Topic:fsp-AUDIT_EVENT_DEMO PartitionCount:1 ReplicationFactor:3 Configs:
Topic: fsp-AUDIT_EVENT_DEMO Partition: 0 Leader: 1 Replicas: 1,6,2 Isr: 1,6,2
So i have enough replica for my topic .
UPDATE
Describe result of
Topic:connect-configs PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: connect-configs Partition: 0 Leader: 6 Replicas: 6 Isr: 6
Topic:connect-status PartitionCount:5 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: connect-status Partition: 0 Leader: 6 Replicas: 6 Isr: 6
Topic: connect-status Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: connect-status Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: connect-status Partition: 3 Leader: 4 Replicas: 4 Isr: 4
Topic: connect-status Partition: 4 Leader: 5 Replicas: 5 Isr: 5
Topic:connect-offsets PartitionCount:25 ReplicationFactor:1 Configs:cleanup.policy=compact
Please help me resolving this .
all three topic is created by kafka connect connect-offsets connect-configs connect-status
Indeed they are
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
Do you suggest I change that ?
Not the names, but the replication factors, yes. 1 replica means if any one broker is down, then the topic is basically offline
Also where are you looking for broker id ?
From columns 3-5 of the topic description
这篇关于以分布式模式启动Kafka连接时请求超时消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!