kafka.common.KafkaException:无法解析来自 EC2 的zookeeper 的代理信息到弹性搜索 [英] kafka.common.KafkaException: Failed to parse the broker info from zookeeper from EC2 to elastic search

查看:26
本文介绍了kafka.common.KafkaException:无法解析来自 EC2 的zookeeper 的代理信息到弹性搜索的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设置了 aws MSK,我正在尝试将记录从 MSK 接收到弹性搜索.我能够将数据以 json 格式推送到 MSK.我想沉迷于弹性搜索.我能够正确地完成所有设置.这就是我在 EC2 实例上所做的

I have aws MSK set up and i am trying to sink records from MSK to elastic search. I am able to push data into MSK into json format . I want to sink to elastic search . I am able to do all set up correctly . This is what i have done on EC2 instance

wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent

/usr/local/confluent/etc/kafka-connect-elasticsearch

之后我修改了 kafka-connect-elasticsearch 并设置了我的弹性搜索 url

After that i have modified kafka-connect-elasticsearch and set my elastic search url

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect

生产者发送如下格式的消息

The producer sends message like below fomrat

{
        "data": {
                "RequestID":    517082653,
                "ContentTypeID":        9,
                "OrgID":        16145,
                "UserID":       4,
                "PromotionStartDateTime":       "2019-12-14T16:06:21Z",
                "PromotionEndDateTime": "2019-12-14T16:16:04Z",
                "SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
        },
        "metadata":     {
                "timestamp":    "2019-12-29T10:37:31.502042Z",
                "record-type":  "data",
                "operation":    "insert",
                "partition-key-type":   "schema-table",
                "schema-name":  "dbo",
                "table-name":   "TRFSDIQueue"
        }
}

我对 kafka 连接将如何从这里开始有点困惑?如果是,我该如何开始?

I am little confused in how will the kafka connect start here ? if yes how can i start that ?

我也启动了如下所示的架构注册表,这给了我错误.

I also have started schema registry like below which gave me error.

/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties

当我这样做时,我得到以下错误

When i do that i get below error

[2019-12-29 13:49:17,861] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/

请帮忙.

正如回答中所建议的,我升级了 kafka 连接版本,但随后我开始出现以下错误

As suggested in answer i upgraded the kafka connect version but then i started getting below error

 ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
        at io.confluent.rest.Application.createServer(Application.java:201)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
        ... 5 more
Caused by: java.util.concurrent.TimeoutException
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
        ... 7 more

推荐答案

首先,Confluent Platform 3.1.2 已经很老了.建议你获取与Kafka版本一致的版本

First, Confluent Platform 3.1.2 is fairly old. I suggest you get the version that aligns with the Kafka version

您使用位于 bin 和 etc/kafka 文件夹下的适当 connect-* 脚本和属性启动 Kafka Connect

You start Kafka Connect using the appropriate connect-* scripts and properties located under bin and etc/kafka folders

例如,

/usr/local/confluent/bin/connect-standalone \
  /usr/local/confluent/etc/kafka/kafka-connect-standalone.properties \ 
  /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties

如果可行,您可以继续使用 connect-distributed 命令

If that works, you can move onto using connect-distributed command instead

关于架构注册表,您可以在其 Github 问题上搜索多个试图让 MSK 工作的人,但根本问题与 MSK 未公开 PLAINTEXT 侦听器和架构注册表不支持命名侦听器有关.(这可能自 5.x 版以来发生了变化)

Regarding Schema Registry, you can search its Github issues for multiple people trying to get MSK to work, but the root issue is related to MSK not exposing a PLAINTEXT listener and the Schema Registry not supporting named listeners. (This may have changed since versions 5.x)

您也可以尝试在 ECS/EKS 中使用 Connect 和 Schema Registry 容器,而不是在 EC2 机器中提取

You could also try using Connect and Schema Registry containers in ECS / EKS rather than extracting in an EC2 machine

这篇关于kafka.common.KafkaException:无法解析来自 EC2 的zookeeper 的代理信息到弹性搜索的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆