mqtt 代理和 kafka 独立连接 [英] mqtt broker and kafka connect standalone

查看:52
本文介绍了mqtt 代理和 kafka 独立连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将数据从 mqtt 代理(mosquitto)摄取到 kafka 代理(apache kafka),所以我执行这些步骤

i want ingest data from mqtt broker(mosquitto) to kafka broker(apache kafka) , so i flow these steps

1/我从 evokly github 创建了我的 mqtt 连接器:所以我下载了代码并使用 gradlew 来构建项目并生成 .jar 依赖项(kafka-connect-mqtt-1.1-SNAPSHOT.jar)2/我将 .jar 文件放入 C:\kafka_2.13-2.8.0\connectors Director3/在我将 CLASSPATH 添加到系统变量之后

1/ I create my mqtt connector from evokly github : so i download the code and i use gradlew to build the project and generate .jar dependencies (kafka-connect-mqtt-1.1-SNAPSHOT.jar) 2/ i put .jar files into C:\kafka_2.13-2.8.0\connectors director 3/ after i add CLASSPATH to system variable

4/
5/6/我启动zookeeper和服务器kafka7/我开始卡夫卡连接

4/
5/ 6/ i start zookeeper and server kafka 7/ i start kafka connect

.\bin\windows\connect-standalone  .\config\connect-standalone.properties  .\config\mqtt.properties

8/9/现在当 istart mosquitto 并发布到 mqtt 主题时10/我的 kafka 主题没有收到任何内容

8/ 9/ now when istart mosquitto and publish to mqtt topic 10/ nothing received into my kafka topic

注意:当我在 mosquitto 上发布和消费时,它也能正常工作,当我在 kafka 上生产并从 kafka 消费时,它工作正常,但使用 kafka 连接没有显示问题,但我没有收到任何数据,请提供任何帮助

NB: when i publish and consume on mosquitto it works fine also when i produce on kafka and consume from kafka it works fine but with kafka connect no problem shown but i don't receive data any help please

这是日志

    [2021-07-29 17:17:21,844] INFO Kafka Connect standalone worker initial
    [2021-07-29 17:17:21,846] INFO Kafka Connect starting (org.apache.kafk
    [2021-07-29 17:17:21,851] INFO Herder starting (org.apache.kafka.conne
    [2021-07-29 17:17:21,853] INFO Worker starting (org.apache.kafka.conne
    [2021-07-29 17:17:21,856] INFO Starting FileOffsetBackingStore with fi
    [2021-07-29 17:17:21,867] INFO Worker started (org.apache.kafka.connec
    [2021-07-29 17:17:21,868] INFO Herder started (org.apache.kafka.connec
    [2021-07-29 17:17:21,870] INFO Initializing REST resources (org.apache
    [2021-07-29 17:17:21,993] INFO Adding admin resources to main listener
    [2021-07-29 17:17:22,214] INFO DefaultSessionIdManager workerName=node
    [2021-07-29 17:17:22,216] INFO No SessionScavenger set, using defaults
    [2021-07-29 17:17:22,220] INFO node0 Scavenging every 600000ms (org.ec
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resour
applicable in the SERVER runtime. Due to constraint configuration probnored.
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourfaces applicable in the SERVER runtime. Due to constraint configuratioe will be ignored.
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
    AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourable in the SERVER runtime. 
    Due to constraint configuration problems t
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourlicable in the SERVER runtime. Due to constraint configuration problem
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.Errors logErro
AVERTISSEMENT: The following warnings have been detected: WARNING: Theource contains empty path annotation.
    WARNING: The (sub)resource method createConnector in org.apache.kafka.
    WARNING: The (sub)resource method listConnectors in org.apache.kafka.c
    WARNING: The (sub)resource method listConnectorPlugins in org.apache.k
    WARNING: The (sub)resource method serverInfo in org.apache.kafka.conne

    [2021-07-29 17:17:23,568] INFO Started o.e.j.s.ServletContextHandler@1
    [2021-07-29 17:17:23,571] INFO REST resources initialized; server is s
    [2021-07-29 17:17:23,573] INFO Kafka Connect started (org.apache.kafka
    [2021-07-29 17:17:23,605] INFO AbstractConfig values:
 (org.apache.kafka.common.config.AbstractConfig:372)
    [2021-07-29 17:17:23,638] INFO Creating connector mqtt of type com.evo
    [2021-07-29 17:17:23,641] INFO SourceConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
        errors.log.enable = true
        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
    [2021-07-29 17:17:23,649] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
        errors.log.enable = true
        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorCo
    [2021-07-29 17:17:23,696] INFO Instantiated connector mqtt with versioa.connect.runtime.Worker:284)
[2021-07-29 17:17:23,702] INFO Finished creating connector mqtt (org.a
[2021-07-29 17:17:23,705] INFO Start a MqttSourceConnector (com.evokly
[2021-07-29 17:17:23,707] INFO MqttSourceConnectorConfig values:
        kafka.topic = kafka-test
        message_processor_class = class com.evokly.kafka.connect.mqtt.
        mqtt.clean_session = true
        mqtt.client_id = null
        mqtt.connection_timeout = 30
        mqtt.keep_alive_interval = 60
        mqtt.password = xxxxxx
        mqtt.qos = 1
        mqtt.server_uris = tcp://localhost:1883
        mqtt.ssl.ca_cert = null
        mqtt.ssl.cert = null
        mqtt.ssl.key = null
        mqtt.topic = mqtt
        mqtt.user = null
 (com.evokly.kafka.connect.mqtt.MqttSourceConnectorConfig:372)
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(com.evokl
y.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(c
om.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process propertie
s                                                                    n
(com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties (com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector        errors.log.enable = true
        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
[2021-07-29 17:17:23,724] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector        errors.log.enable = true        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:372)

蚊子日志

    1627580533: mosquitto version 2.0.11 starting
1627580533: Using default config.
1627580533: Starting in local only mode. Connections will only be possible from clients running on this machine.
1627580533: Create a configuration file which defines a listener to allow remote access.
1627580533: For more details see https://mosquitto.org/documentation/authentication-methods/
1627580533: Opening ipv4 listen socket on port 1883.
1627580533: Opening ipv6 listen socket on port 1883.
1627580533: mosquitto version 2.0.11 running
1627588404: New connection from ::1:57594 on port 1883.
1627588404: New client connected from ::1:57594 as auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (p2, c1, k60).
1627588404: No will message specified.
1627588404: Sending CONNACK to auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (0, 0)
1627588404: Received PUBLISH from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588404: Received DISCONNECT from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E
1627588404: Client auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E disconnected.
1627588938: New connection from ::1:57846 on port 1883.
1627588938: New client connected from ::1:57846 as auto-2798744F-5690-0500-211D-0EBD497E50F1 (p2, c1, k60).
1627588938: No will message specified.
1627588938: Sending CONNACK to auto-2798744F-5690-0500-211D-0EBD497E50F1 (0, 0)
1627588938: Received PUBLISH from auto-2798744F-5690-0500-211D-0EBD497E50F1 (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588938: Received DISCONNECT from auto-2798744F-5690-0500-211D-0EBD497E50F1
1627588938: Client auto-2798744F-5690-0500-211D-0EBD497E50F1 disconnected.

推荐答案

将以下条目添加到您的 mqtt.properties 文件中:

Add the following entry to your mqtt.properties file:

errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=none

另外,在你的Kafka安装目录/config下找到connect-log4j.properties文件并修改这一行:

In addition, find the file connect-log4j.properties in your Kafka installation directory under /config and change this line:

log4j.rootLogger=INFO, stdout, connectAppender

到:

log4j.rootLogger=TRACE, stdout, connectAppender

您应该获得大量诊断信息以帮助澄清问题.如果它没有解决问题,请将输出添加到问题中(请不要粘贴屏幕截图.复制文本输出以便我们可以搜索文本)

You should get a lot of diagnostics information to help clarify the problem. If it does not clear up the issue, add the output to the question (Don't paste a screenshot, please. Copy the text output so we can search through the text)

这篇关于mqtt 代理和 kafka 独立连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆