kafka-connect :在连接器接收器 cassandra 的分布式配置中出错 [英] kafka-connect : Getting error in distributed configuration for connector sink cassandra
问题描述
我收到连接器接收器 cassandra 的分布式配置的任务错误.我正在运行命令:
<块引用>curl -s 本地主机:8083/connectors/cassandraSinkConnector2/status |jq
获取状态
<代码>{"name": "cassandraSinkConnector2",连接器": {"状态": "正在运行",worker_id":本地主机:8083"},任务": [{身份证":0,状态":失败","worker_id": "本地主机:8083","trace": "org.apache.kafka.common.KafkaException: 无法构建 kafka 消费者\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:811)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:624)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer. (KafkaConsumer.java:605)\n\tat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException 发生异常\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)\n\t... 12 更多\n由:java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)\n\tat org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)\n\t...还有 15 个\n"}],类型":水槽"
堆栈跟踪:
"trace": "org.apache.kafka.common.KafkaException: 构建 kafka 消费者失败在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)在 org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)在 org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)在 org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)在 org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)在 org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)在 org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)在 java.util.concurrent.FutureTask.run(FutureTask.java:266)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)在 java.lang.Thread.run(Thread.java:748)引起:org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException 发生异常在 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)在 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)在 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)……还有 12 个引起:java.lang.ClassNotFoundException:io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor在 java.net.URLClassLoader.findClass(URLClassLoader.java:382)在 java.lang.ClassLoader.loadClass(ClassLoader.java:424)在 org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)在 java.lang.ClassLoader.loadClass(ClassLoader.java:357)在 java.lang.Class.forName0(Native Method)在 java.lang.Class.forName(Class.java:348)在 org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)在 org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)在 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)……还有 15 个
您可以在下面找到连接器的配置.
<预><代码>{"name": "cassandraSinkConnector2",配置":{"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector","tasks.max": "1","topics": "appartenance_de","cassandra.contact.points": "localhost","cassandra.kcql": "INSERT INTO app_test SELECT * FROM app_de","cassandra.port": "9042","cassandra.keyspace": "dev_dkks","cassandra.username": "超级用户","cassandra.password": "密码","cassandra.write.mode": "插入","value.converter.schemas.enable": "true","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://localhost:8081",名称":cassandraSinkConnector2"},任务": [{连接器":cassandraSinkConnector2",任务":0}],类型":水槽"}新错误:
org.apache.kafka.connect.errors.ConnectException:由于不可恢复的异常正在退出 WorkerSinkTask.在 org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)在 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)在 org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)在 org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)在 java.util.concurrent.FutureTask.run(FutureTask.java:266)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)在 java.lang.Thread.run(Thread.java:748)引起:org.apache.kafka.connect.errors.DataException:遇到带有空键的记录.此连接器要求来自 Kafka 的记录包含 Cassandra 表的键.请使用像 org.apache.kafka.connect.transforms.ValueToKey 这样的转换来创建一个具有正确字段的键.在 io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:86)在 org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)……还有 10 个"
根本错误是
java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
监控拦截器是 Confluent Platform 的一部分.您可以在 Kafka Connect 工作器配置中禁用它们,或者更好的是,确保 /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
JAR 可用给您的 Kafka Connect 工作人员.
您看到的新错误是
org.apache.kafka.connect.errors.DataException:遇到带有空键的记录.此连接器要求来自 Kafka 的记录包含 Cassandra 表的键.请使用像 org.apache.kafka.connect.transforms.ValueToKey 这样的转换来创建一个具有正确字段的键.
我建议按照错误中的建议使用单消息转换来正确键入您的数据.你可以看到一个这样做的例子 此处 和此处的转换文档.
I get task error for a distributed configuration for a connector sink cassandra. I was running the command :
curl -s localhost:8083/connectors/cassandraSinkConnector2/status | jq
to get the status
{
"name": "cassandraSinkConnector2",
"connector": {
"state": "RUNNING",
"worker_id": localhost:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "localhost:8083",
"trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)\n\tat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurred\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)\n\t... 12 more\nCaused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)\n\tat org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)\n\t... 15 more\n"
}
],
"type": "sink"
Stack trace:
"trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)
at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurred
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)
... 12 more
Caused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)
... 15 more
You can find below the configuration of the connector.
{
"name": "cassandraSinkConnector2",
"config": {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "1",
"topics": "appartenance_de",
"cassandra.contact.points": "localhost",
"cassandra.kcql": "INSERT INTO app_test SELECT * FROM app_de",
"cassandra.port": "9042",
"cassandra.keyspace": "dev_dkks",
"cassandra.username": "superuser",
"cassandra.password": "password",
"cassandra.write.mode": "insert",
"value.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"name": "cassandraSinkConnector2"
},
"tasks": [
{
"connector": "cassandraSinkConnector2",
"task": 0
}
],
"type": "sink"
}
New error:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Record with a null key was encountered. This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
at io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
"
The root error is
java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
The Monitoring Interceptors are part of Confluent Platform. You can either disable their use in your Kafka Connect worker config, or better, make sure that the /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
JAR is available to your Kafka Connect worker.
The new error you're seeing is
org.apache.kafka.connect.errors.DataException:
Record with a null key was encountered. This connector requires that records from Kafka contain the keys for the Cassandra table.
Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
I'd suggest using a Single Message Transform as suggested in the error to correctly key your data. You can see an example of doing this here and the documentation for the transform here.
这篇关于kafka-connect :在连接器接收器 cassandra 的分布式配置中出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!