连接到 Apache Kafka 多节点集群中的 Zookeeper [英] Connecting to Zookeeper in a Apache Kafka Multi Node cluster

查看:33
本文介绍了连接到 Apache Kafka 多节点集群中的 Zookeeper的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我按照以下说明设置了多节点 kafka 集群.现在,如何连接到zookeeper?可以在 JAVA 中从生产者/消费者端连接到一个动物园管理员还是有办法连接所有动物园管理员节点?

设置多节点Apache ZooKeeper集群

在集群的每个节点上,将以下几行添加到文件 kafka/config/zookeeper.properties

 server.1=zNode01:2888:3888server.2=zNode02:2888:3888server.3=zNode03:2888:3888#如果需要,请在此处添加更多服务器初始限制=5同步限制=2

在集群的每个节点上,在 dataDir 属性表示的文件夹中创建一个名为 myid 的文件(默认文件夹为/tmp/zookeeper ).myid 文件应该只包含 znode 的 id('1' 代表 zNode01,'2' 代表 ZNode02,等等)

设置多代理 Apache Kafka 集群

在集群的每个节点上修改kafka/config/server.properties文件中的zookeeper.connect属性:

 zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

在集群的每个节点上修改 kafka/config/server.properties 文件中的属性 host.name:主机名=zNode0x

在集群的每个节点上修改文件 kafka/config/server.properties 中的属性 broker.id(集群中的每个代理都应该有一个唯一的 id)

解决方案

可以传递生产者或消费者中的所有节点.Kafka 足够智能,它会根据复制因子或分区连接到具有您所需数据的节点

这是消费者代码:

Properties props = new Properties();props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer消费者 = 新的 KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));而(真){ConsumerRecords记录 = 消费者.poll(100);for (ConsumerRecord record : 记录)System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}

您可以找到更多信息 这里

注意:这个方法的问题是它会打开多个连接来找出哪个节点保存数据.对于更健壮和可扩展的系统,您可以维护分区号和节点名称的映射,这也将有助于负载平衡.

这里是生产者样本

Properties props = new Properties();props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");props.put("acks", "all");props.put("重试", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");生产者<字符串,字符串>生产者 = 新的 KafkaProducer<>(props);for(int i = 0; i <100; i++)producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));生产者.close();

更多信息这里

I followed the following instructions to set up a multi node kafka cluster. Now, how to connect to the zookeeper ? Is it okay to connect to just one zookeeper from the Producer/consumer side in JAVA or is there a way to connect all the zookeeper nodes ?

Setting a multi node Apache ZooKeeper cluster

On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties

    server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (‘1’ for zNode01, ‘2’ for ZNode02, etc… )

Setting a multi broker Apache Kafka cluster

On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:

    zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

On every node of the cluster modify the property host.name from the file kafka/config/server.properties: host.name=zNode0x

On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)

解决方案

You can pass all the nodes in the producer or consumer. Kafka is intelligent enough that it will connect to the node that has the data you required based on the replication factor or the partition

Here is the consumer code :

Properties props = new Properties();
     props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

You can find more info here

Note: Problem with this approch is it will open multiple connection to find out the which node holds the data. For more robust and scalable systems you can maintain the map of partition number and node name , this will help in load balencing also.

Here is the producer sample

Properties props = new Properties();
 props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

more info here

这篇关于连接到 Apache Kafka 多节点集群中的 Zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆