mqtt 代理和 kafka 独立连接 [英] mqtt broker and kafka connect standalone
问题描述
我想将数据从 mqtt 代理(mosquitto)摄取到 kafka 代理(apache kafka),所以我执行这些步骤
i want ingest data from mqtt broker(mosquitto) to kafka broker(apache kafka) , so i flow these steps
1/我从 evokly github 创建了我的 mqtt 连接器:所以我下载了代码并使用 gradlew 来构建项目并生成 .jar 依赖项(kafka-connect-mqtt-1.1-SNAPSHOT.jar)2/我将 .jar 文件放入 C:\kafka_2.13-2.8.0\connectors Director3/在我将 CLASSPATH 添加到系统变量之后
1/ I create my mqtt connector from evokly github : so i download the code and i use gradlew to build the project and generate .jar dependencies (kafka-connect-mqtt-1.1-SNAPSHOT.jar) 2/ i put .jar files into C:\kafka_2.13-2.8.0\connectors director 3/ after i add CLASSPATH to system variable
4/
5/6/我启动zookeeper和服务器kafka7/我开始卡夫卡连接
4/
5/
6/ i start zookeeper and server kafka
7/ i start kafka connect
.\bin\windows\connect-standalone .\config\connect-standalone.properties .\config\mqtt.properties
8/9/现在当 istart mosquitto 并发布到 mqtt 主题时10/我的 kafka 主题没有收到任何内容
8/ 9/ now when istart mosquitto and publish to mqtt topic 10/ nothing received into my kafka topic
注意:当我在 mosquitto 上发布和消费时,它也能正常工作,当我在 kafka 上生产并从 kafka 消费时,它工作正常,但使用 kafka 连接没有显示问题,但我没有收到任何数据,请提供任何帮助
NB: when i publish and consume on mosquitto it works fine also when i produce on kafka and consume from kafka it works fine but with kafka connect no problem shown but i don't receive data any help please
这是日志
[2021-07-29 17:17:21,844] INFO Kafka Connect standalone worker initial
[2021-07-29 17:17:21,846] INFO Kafka Connect starting (org.apache.kafk
[2021-07-29 17:17:21,851] INFO Herder starting (org.apache.kafka.conne
[2021-07-29 17:17:21,853] INFO Worker starting (org.apache.kafka.conne
[2021-07-29 17:17:21,856] INFO Starting FileOffsetBackingStore with fi
[2021-07-29 17:17:21,867] INFO Worker started (org.apache.kafka.connec
[2021-07-29 17:17:21,868] INFO Herder started (org.apache.kafka.connec
[2021-07-29 17:17:21,870] INFO Initializing REST resources (org.apache
[2021-07-29 17:17:21,993] INFO Adding admin resources to main listener
[2021-07-29 17:17:22,214] INFO DefaultSessionIdManager workerName=node
[2021-07-29 17:17:22,216] INFO No SessionScavenger set, using defaults
[2021-07-29 17:17:22,220] INFO node0 Scavenging every 600000ms (org.ec
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resour
applicable in the SERVER runtime. Due to constraint configuration probnored.
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourfaces applicable in the SERVER runtime. Due to constraint configuratioe will be ignored.
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourable in the SERVER runtime.
Due to constraint configuration problems t
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourlicable in the SERVER runtime. Due to constraint configuration problem
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.Errors logErro
AVERTISSEMENT: The following warnings have been detected: WARNING: Theource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.c
WARNING: The (sub)resource method listConnectorPlugins in org.apache.k
WARNING: The (sub)resource method serverInfo in org.apache.kafka.conne
[2021-07-29 17:17:23,568] INFO Started o.e.j.s.ServletContextHandler@1
[2021-07-29 17:17:23,571] INFO REST resources initialized; server is s
[2021-07-29 17:17:23,573] INFO Kafka Connect started (org.apache.kafka
[2021-07-29 17:17:23,605] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:372)
[2021-07-29 17:17:23,638] INFO Creating connector mqtt of type com.evo
[2021-07-29 17:17:23,641] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
errors.log.enable = true
errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
[2021-07-29 17:17:23,649] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
errors.log.enable = true
errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorCo
[2021-07-29 17:17:23,696] INFO Instantiated connector mqtt with versioa.connect.runtime.Worker:284)
[2021-07-29 17:17:23,702] INFO Finished creating connector mqtt (org.a
[2021-07-29 17:17:23,705] INFO Start a MqttSourceConnector (com.evokly
[2021-07-29 17:17:23,707] INFO MqttSourceConnectorConfig values:
kafka.topic = kafka-test
message_processor_class = class com.evokly.kafka.connect.mqtt.
mqtt.clean_session = true
mqtt.client_id = null
mqtt.connection_timeout = 30
mqtt.keep_alive_interval = 60
mqtt.password = xxxxxx
mqtt.qos = 1
mqtt.server_uris = tcp://localhost:1883
mqtt.ssl.ca_cert = null
mqtt.ssl.cert = null
mqtt.ssl.key = null
mqtt.topic = mqtt
mqtt.user = null
(com.evokly.kafka.connect.mqtt.MqttSourceConnectorConfig:372)
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(com.evokl
y.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(c
om.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process propertie
s n
(com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties (com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector errors.log.enable = true
errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
[2021-07-29 17:17:23,724] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector errors.log.enable = true errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = [] transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:372)
蚊子日志
1627580533: mosquitto version 2.0.11 starting
1627580533: Using default config.
1627580533: Starting in local only mode. Connections will only be possible from clients running on this machine.
1627580533: Create a configuration file which defines a listener to allow remote access.
1627580533: For more details see https://mosquitto.org/documentation/authentication-methods/
1627580533: Opening ipv4 listen socket on port 1883.
1627580533: Opening ipv6 listen socket on port 1883.
1627580533: mosquitto version 2.0.11 running
1627588404: New connection from ::1:57594 on port 1883.
1627588404: New client connected from ::1:57594 as auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (p2, c1, k60).
1627588404: No will message specified.
1627588404: Sending CONNACK to auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (0, 0)
1627588404: Received PUBLISH from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588404: Received DISCONNECT from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E
1627588404: Client auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E disconnected.
1627588938: New connection from ::1:57846 on port 1883.
1627588938: New client connected from ::1:57846 as auto-2798744F-5690-0500-211D-0EBD497E50F1 (p2, c1, k60).
1627588938: No will message specified.
1627588938: Sending CONNACK to auto-2798744F-5690-0500-211D-0EBD497E50F1 (0, 0)
1627588938: Received PUBLISH from auto-2798744F-5690-0500-211D-0EBD497E50F1 (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588938: Received DISCONNECT from auto-2798744F-5690-0500-211D-0EBD497E50F1
1627588938: Client auto-2798744F-5690-0500-211D-0EBD497E50F1 disconnected.
推荐答案
将以下条目添加到您的 mqtt.properties 文件中:
Add the following entry to your mqtt.properties file:
errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=none
另外,在你的Kafka安装目录/config下找到connect-log4j.properties文件并修改这一行:
In addition, find the file connect-log4j.properties in your Kafka installation directory under /config and change this line:
log4j.rootLogger=INFO, stdout, connectAppender
到:
log4j.rootLogger=TRACE, stdout, connectAppender
您应该获得大量诊断信息以帮助澄清问题.如果它没有解决问题,请将输出添加到问题中(请不要粘贴屏幕截图.复制文本输出以便我们可以搜索文本)
You should get a lot of diagnostics information to help clarify the problem. If it does not clear up the issue, add the output to the question (Don't paste a screenshot, please. Copy the text output so we can search through the text)
这篇关于mqtt 代理和 kafka 独立连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!