kafka.common.KafkaException:无法将Zookeeper的经纪人信息从EC2解析为弹性搜索 [英] kafka.common.KafkaException: Failed to parse the broker info from zookeeper from EC2 to elastic search

查看:215
本文介绍了kafka.common.KafkaException:无法将Zookeeper的经纪人信息从EC2解析为弹性搜索的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经设置了aws MSK,我正尝试将记录从MSK下沉到弹性搜索中. 我能够将数据推入MSK变成json格式. 我想陷入弹性搜索. 我能够正确设置所有设置. 这就是我在EC2实例上所做的

I have aws MSK set up and i am trying to sink records from MSK to elastic search. I am able to push data into MSK into json format . I want to sink to elastic search . I am able to do all set up correctly . This is what i have done on EC2 instance

wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent

/usr/local/confluent/etc/kafka-connect-elasticsearch

之后,我修改了kafka-connect-elasticsearch并设置了我的弹性搜索网址

After that i have modified kafka-connect-elasticsearch and set my elastic search url

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect

生产者发送以下消息,如fomrat

The producer sends message like below fomrat

{
        "data": {
                "RequestID":    517082653,
                "ContentTypeID":        9,
                "OrgID":        16145,
                "UserID":       4,
                "PromotionStartDateTime":       "2019-12-14T16:06:21Z",
                "PromotionEndDateTime": "2019-12-14T16:16:04Z",
                "SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
        },
        "metadata":     {
                "timestamp":    "2019-12-29T10:37:31.502042Z",
                "record-type":  "data",
                "operation":    "insert",
                "partition-key-type":   "schema-table",
                "schema-name":  "dbo",
                "table-name":   "TRFSDIQueue"
        }
}

我对如何将kafka连接从这里开始感到困惑不解? 如果是的话,我该如何开始?

I am little confused in how will the kafka connect start here ? if yes how can i start that ?

我还已经启动了如下所示的架构注册表,这给了我错误.

I also have started schema registry like below which gave me error.

/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties

当我这样做时,我得到的错误低于

When i do that i get below error

[2019-12-29 13:49:17,861] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/

请帮助.

如答案所示,我升级了kafka connect版本,但随后我开始出现错误

As suggested in answer i upgraded the kafka connect version but then i started getting below error

 ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
        at io.confluent.rest.Application.createServer(Application.java:201)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
        ... 5 more
Caused by: java.util.concurrent.TimeoutException
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
        ... 7 more

推荐答案

首先,Confluent Platform 3.1.2相当老.我建议您获得与Kafka版本一致的版本

First, Confluent Platform 3.1.2 is fairly old. I suggest you get the version that aligns with the Kafka version

使用适当的connect-*脚本和位于bin和etc/kafka文件夹下的属性启动Kafka Connect

You start Kafka Connect using the appropriate connect-* scripts and properties located under bin and etc/kafka folders

例如

/usr/local/confluent/bin/connect-standalone \
  /usr/local/confluent/etc/kafka/kafka-connect-standalone.properties \ 
  /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties

如果可行,您可以改为使用connect-distributed命令

If that works, you can move onto using connect-distributed command instead

关于Schema Registry,您可以搜索其Github问题,以寻找试图使MSK正常工作的多个人,但是根本问题与MSK不公开PLAINTEXT侦听器以及Schema Registry不支持命名侦听器有关. (自版本5.x起,这可能已更改)

Regarding Schema Registry, you can search its Github issues for multiple people trying to get MSK to work, but the root issue is related to MSK not exposing a PLAINTEXT listener and the Schema Registry not supporting named listeners. (This may have changed since versions 5.x)

您还可以尝试在ECS/EKS中使用Connect和Schema Registry容器,而不是在EC2计算机中提取

You could also try using Connect and Schema Registry containers in ECS / EKS rather than extracting in an EC2 machine

这篇关于kafka.common.KafkaException:无法将Zookeeper的经纪人信息从EC2解析为弹性搜索的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆