卡夫卡:从ZooKeeper获取经纪人主机 [英] Kafka: Get broker host from ZooKeeper

查看:167
本文介绍了卡夫卡:从ZooKeeper获取经纪人主机的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于特殊原因,我需要同时使用-ConsumerGroup(又称高级消费者)和SimpleConsumer(又称低级别消费者)来读取Kafka.对于ConsumerGroup,我使用基于ZooKeeper的配置,并对它完全满意,但是SimpleConsumer要求实例化种子代理.

For particular reasons I need to use both - ConsumerGroup (a.k.a. high-level consumer) and SimpleConsumer (a.k.a. low-level consumer) to read from Kafka. For ConsumerGroup I use ZooKeeper-based config and am completely satisfied with it, but SimpleConsumer requires seed brokers to be instantiated.

我不想同时保留这两个列表-ZooKeeper和代理主机.因此,我正在寻找一种方法自动从ZooKeeper 中发现特定主题的经纪人.

I don't want to keep list of both - ZooKeeper and broker hosts. Thus, I'm looking for a way to automatically discover brokers for a particular topic from ZooKeeper.

由于某些间接信息,我相信这些数据存储在以下路径之一下的ZooKeeper中:

Because of some indirect information I belief that these data is stored in ZooKeeper under one of the following paths:

  • /brokers/topics/<topic>/partitions/<partition-id>/state
  • /经纪人/身份证/

但是,当我尝试从这些节点读取数据时,出现序列化错误(为此,我使用com.101tec.zkclient):

However, when I try to read data from these nodes, I'm getting serialization error (I'm using com.101tec.zkclient for this):

org.I0Itec.zkclient.exception.ZkMarshallingError:java.io.StreamCorruptedException:无效的流头:7B226A6D 在org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) 在org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) 在org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64消失 引起原因:java.io.StreamCorruptedException:无效的流头:7B226A6D 在java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) 在java.io.ObjectInputStream.(ObjectInputStream.java:299) 在org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) 在org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ...还有69

org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more

我可以毫无问题地编写和读取自定义Java对象(例如字符串),因此我认为这不是客户端的问题,而是棘手的编码.因此,我想知道:

I can write and read custom Java objects (e.g. Strings) without any problem, so I believe it's not a problem of a client, but rather tricky encoding. Thus, I want to know:

  1. 如果这是正确的方法,如何正确读取这些节点?
  2. 如果整个方法都不对,什么是正确的?
  1. If this is the right way to go, how to read these nodes properly?
  2. If the whole approach is wrong, what is the right one?

推荐答案

这就是我的一位同事用来获取Kafka经纪人列表的方式.我认为这是您动态获取经纪人名单的正确方法.

That is the way of what one of my colleagues did to get a list of Kafka brokers. I think it's a correct way when you want to get a broker list dynamically.

这是示例代码,显示了如何获取列表.

Here is an example code that shows how to get the list.

public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) {
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        }
    }
}

将代码运行到由三个代理组成的集群上

Running the code onto the cluster consisting of three brokers results in

1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}

这篇关于卡夫卡:从ZooKeeper获取经纪人主机的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆