Kafka Connect:没有为连接器创建任务 [英] Kafka Connect: No tasks created for a connector

查看:30
本文介绍了Kafka Connect:没有为连接器创建任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用 Debezium (MongoDB) 和 Confluent S3 连接器以分布式模式运行 Kafka Connect(Confluent Platform 5.4,即 Kafka 2.4).通过 REST API 添加新连接器时,连接器创建为 RUNNING 状态,但不会为连接器创建任何任务.

We are running Kafka Connect (Confluent Platform 5.4, ie. Kafka 2.4) in a distributed mode using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector.

暂停和恢复连接器无济于事.当我们停止所有工人然后再次启动它们时,任务被创建并且一切都按预期运行.

Pausing and resuming the connector does not help. When we stop all workers and then start them again, the tasks are created and everything runs as it should.

该问题不是由连接器插件引起的,因为我们看到 Debezium 和 S3 连接器的行为相同.同样在调试日志中,我可以看到 Debezium 正确地从 Connector.taskConfigs() 方法返回了一个任务配置.

The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method.

有人可以告诉我该怎么做,因为我们可以在不重启工作人员的情况下添加连接器吗?谢谢.

Can somebody tell me what to do se we can add connectors without restarting the workers? Thanks.

配置详情

集群有 3 个节点,具有以下connect-distributed.properties:

The cluster has 3 nodes with the following connect-distributed.properties:

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5

config.storage.topic=connect-configs-qa
config.storage.replication.factor=3

status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083

plugin.path=/opt/kafka-connect/plugins,/usr/share/java/

security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>

max.request.size=20000000
max.partition.fetch.bytes=20000000

连接器配置

Debezium 示例:

Debezium example:

{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
    "mongodb.name": "qa-debezium-comp",
    "mongodb.ssl.enabled": true,
    "collection.whitelist": "converter[.]task",
    "tombstones.on.delete": true
  }
}

S3 示例:

{
  "name": "qa-s3-sink-task|1",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "qa-debezium-comp.converter.task",
    "topics.dir": "data/env/qa",
    "s3.region": "eu-west-1",
    "s3.bucket.name": "<bucket-name>",
    "flush.size": "15000",
    "rotate.interval.ms": "3600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "transforms": "ExtractDocument",
    "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
  }
}

连接器是使用 curl 创建的:curl -X POST -H "Content-Type: application/json" --data @<json_file>http://:10083/connectors

The connectors are created using curl: curl -X POST -H "Content-Type: application/json" --data @<json_file> http:/<connect_host>:10083/connectors

推荐答案

我遇到了同样的问题,所以我更改了连接器的名称并创建了一个新的连接器,它起作用了,但我不知道这个问题的根源因为我们在 kafka-connect 日志中没有信息.

I had the same problem, so i changed the name of the connector and create a new one, it worked but I don't know the source of this problem because we have no information in kafka-connect logs.

这篇关于Kafka Connect:没有为连接器创建任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆