卡夫卡:从ZooKeeper获取经纪人主机 [英] Kafka: Get broker host from ZooKeeper
问题描述
出于特殊原因,我需要同时使用-ConsumerGroup
(又称高级消费者)和SimpleConsumer
(又称低级别消费者)来读取Kafka.对于ConsumerGroup
,我使用基于ZooKeeper的配置,并对它完全满意,但是SimpleConsumer
要求实例化种子代理.
For particular reasons I need to use both - ConsumerGroup
(a.k.a. high-level consumer) and SimpleConsumer
(a.k.a. low-level consumer) to read from Kafka. For ConsumerGroup
I use ZooKeeper-based config and am completely satisfied with it, but SimpleConsumer
requires seed brokers to be instantiated.
我不想同时保留这两个列表-ZooKeeper和代理主机.因此,我正在寻找一种方法自动从ZooKeeper 中发现特定主题的经纪人.
I don't want to keep list of both - ZooKeeper and broker hosts. Thus, I'm looking for a way to automatically discover brokers for a particular topic from ZooKeeper.
由于某些间接信息,我相信这些数据存储在以下路径之一下的ZooKeeper中:
Because of some indirect information I belief that these data is stored in ZooKeeper under one of the following paths:
-
/brokers/topics/<topic>/partitions/<partition-id>/state
- /经纪人/身份证/
但是,当我尝试从这些节点读取数据时,出现序列化错误(为此,我使用com.101tec.zkclient
):
However, when I try to read data from these nodes, I'm getting serialization error (I'm using com.101tec.zkclient
for this):
org.I0Itec.zkclient.exception.ZkMarshallingError:java.io.StreamCorruptedException:无效的流头:7B226A6D 在org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) 在org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64消失 引起原因:java.io.StreamCorruptedException:无效的流头:7B226A6D 在java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) 在java.io.ObjectInputStream.(ObjectInputStream.java:299) 在org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) 在org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ...还有69
org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more
我可以毫无问题地编写和读取自定义Java对象(例如字符串),因此我认为这不是客户端的问题,而是棘手的编码.因此,我想知道:
I can write and read custom Java objects (e.g. Strings) without any problem, so I believe it's not a problem of a client, but rather tricky encoding. Thus, I want to know:
- 如果这是正确的方法,如何正确读取这些节点?
- 如果整个方法都不对,什么是正确的?
- If this is the right way to go, how to read these nodes properly?
- If the whole approach is wrong, what is the right one?
推荐答案
这就是我的一位同事用来获取Kafka经纪人列表的方式.我认为这是您动态获取经纪人名单的正确方法.
That is the way of what one of my colleagues did to get a list of Kafka brokers. I think it's a correct way when you want to get a broker list dynamically.
这是示例代码,显示了如何获取列表.
Here is an example code that shows how to get the list.
public class KafkaBrokerInfoFetcher {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
}
将代码运行到由三个代理组成的集群上
Running the code onto the cluster consisting of three brokers results in
1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
这篇关于卡夫卡:从ZooKeeper获取经纪人主机的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!