Kafka:从 ZooKeeper 获取代理主机 [英] Kafka: Get broker host from ZooKeeper

查看:35
本文介绍了Kafka:从 ZooKeeper 获取代理主机的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于特殊原因,我需要同时使用 - ConsumerGroup(又名高级消费者)和 SimpleConsumer(又名低级消费者)来读取 Kafka.对于 ConsumerGroup,我使用基于 ZooKeeper 的配置并且对它完全满意,但是 SimpleConsumer 需要实例化种子代理.

For particular reasons I need to use both - ConsumerGroup (a.k.a. high-level consumer) and SimpleConsumer (a.k.a. low-level consumer) to read from Kafka. For ConsumerGroup I use ZooKeeper-based config and am completely satisfied with it, but SimpleConsumer requires seed brokers to be instantiated.

我不想同时保留 ZooKeeper 和代理主机的列表.因此,我正在寻找一种方法来自动发现特定主题的代理来自 ZooKeeper.

I don't want to keep list of both - ZooKeeper and broker hosts. Thus, I'm looking for a way to automatically discover brokers for a particular topic from ZooKeeper.

由于一些间接信息,我相信这些数据存储在 ZooKeeper 中的以下路径之一:

Because of some indirect information I belief that these data is stored in ZooKeeper under one of the following paths:

  • /brokers/topics//partitions//state
  • /brokers/ids/

但是,当我尝试从这些节点读取数据时,出现序列化错误(为此我使用了 com.101tec.zkclient):

However, when I try to read data from these nodes, I'm getting serialization error (I'm using com.101tec.zkclient for this):

org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: 无效的流头: 7B226A6D在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)在 org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)... 64 省略引起:java.io.StreamCorruptedException:无效的流标头:7B226A6D在 java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)在 java.io.ObjectInputStream.(ObjectInputStream.java:299)在 org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30)在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)... 69 更多

org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more

我可以毫无问题地编写和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是编码很棘手.因此,我想知道:

I can write and read custom Java objects (e.g. Strings) without any problem, so I believe it's not a problem of a client, but rather tricky encoding. Thus, I want to know:

  1. 如果这是正确的方法,如何正确读取这些节点?
  2. 如果整个方法都是错误的,什么是正确的?

推荐答案

这就是我的一位同事为获取 Kafka 代理列表所做的工作.我认为当您想动态获取经纪人列表时,这是一种正确的方法.

That is the way of what one of my colleagues did to get a list of Kafka brokers. I think it's a correct way when you want to get a broker list dynamically.

这是一个示例代码,展示了如何获取列表.

Here is an example code that shows how to get the list.

public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) {
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        }
    }
}

在由三个代理组成的集群上运行代码结果

Running the code onto the cluster consisting of three brokers results in

1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}

这篇关于Kafka:从 ZooKeeper 获取代理主机的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆