在Apache Kafka多节点集群中连接到Zookeeper [英] Connecting to Zookeeper in a Apache Kafka Multi Node cluster

查看:329
本文介绍了在Apache Kafka多节点集群中连接到Zookeeper的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我按照以下说明设置了多节点kafka集群。
现在,如何连接到Zookeeper?是否可以从JAVA的生产者/消费者端仅连接到一个Zookeeper,还是可以连接所有Zookeeper节点?

I followed the following instructions to set up a multi node kafka cluster. Now, how to connect to the zookeeper ? Is it okay to connect to just one zookeeper from the Producer/consumer side in JAVA or is there a way to connect all the zookeeper nodes ?

设置多节点Apache ZooKeeper群集

Setting a multi node Apache ZooKeeper cluster

在群集的每个节点上,将以下行添加到文件中kafka / config / zookeeper.properties

On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties

    server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

在集群的每个节点上,在由dataDir属性表示的文件夹中创建一个名为myid的文件(默认情况下,该文件夹为/ tmp / zookeeper )。 myid文件应仅包含znode的ID(对于zNode01,'1';对于ZNode02,'2',等等……)

On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (‘1’ for zNode01, ‘2’ for ZNode02, etc… )

设置多代理Apache Kafka集群

Setting a multi broker Apache Kafka cluster

在集群的每个节点上,通过文件kafka / config / server.properties修改属性zookeeper.connect:

On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:

    zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

在集群的每个节点上,从文件kafka / config / server.properties中修改属性host.name:
host.name = zNode0x

On every node of the cluster modify the property host.name from the file kafka/config/server.properties: host.name=zNode0x

在集群的每个节点上,从文件kafka / config / server.properties修改属性broker.id(集群中的每个代理都应具有唯一的ID)

On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)

推荐答案

您可以传递生产者或使用者中的所有节点。 Kafka非常聪明,它可以连接到具有基于复制因子或分区所需数据的节点

You can pass all the nodes in the producer or consumer. Kafka is intelligent enough that it will connect to the node that has the data you required based on the replication factor or the partition

这里是使用者代码:

Properties props = new Properties();
     props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

您可以找到更多信息此处

注意:此方法的问题是它将打开多个连接以找出哪个节点保存了数据。对于更健壮和可扩展的系统,您可以维护分区号和节点名称的映射,这也将有助于负载平衡。

Note: Problem with this approch is it will open multiple connection to find out the which node holds the data. For more robust and scalable systems you can maintain the map of partition number and node name , this will help in load balencing also.

这里是生产者样本

Properties props = new Properties();
 props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

更多信息此处

这篇关于在Apache Kafka多节点集群中连接到Zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆