在Apache Kafka多节点集群中连接到Zookeeper [英] Connecting to Zookeeper in a Apache Kafka Multi Node cluster
问题描述
我按照以下说明设置了多节点kafka集群。
现在,如何连接到Zookeeper?是否可以从JAVA的生产者/消费者端仅连接到一个Zookeeper,还是可以连接所有Zookeeper节点?
I followed the following instructions to set up a multi node kafka cluster. Now, how to connect to the zookeeper ? Is it okay to connect to just one zookeeper from the Producer/consumer side in JAVA or is there a way to connect all the zookeeper nodes ?
设置多节点Apache ZooKeeper群集
Setting a multi node Apache ZooKeeper cluster
在群集的每个节点上,将以下行添加到文件中kafka / config / zookeeper.properties
On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties
server.1=zNode01:2888:3888
server.2=zNode02:2888:3888
server.3=zNode03:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
在集群的每个节点上,在由dataDir属性表示的文件夹中创建一个名为myid的文件(默认情况下,该文件夹为/ tmp / zookeeper )。 myid文件应仅包含znode的ID(对于zNode01,'1';对于ZNode02,'2',等等……)
On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (‘1’ for zNode01, ‘2’ for ZNode02, etc… )
设置多代理Apache Kafka集群
Setting a multi broker Apache Kafka cluster
在集群的每个节点上,通过文件kafka / config / server.properties修改属性zookeeper.connect:
On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
在集群的每个节点上,从文件kafka / config / server.properties中修改属性host.name:
host.name = zNode0x
On every node of the cluster modify the property host.name from the file kafka/config/server.properties: host.name=zNode0x
在集群的每个节点上,从文件kafka / config / server.properties修改属性broker.id(集群中的每个代理都应具有唯一的ID)
On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)
推荐答案
您可以传递生产者或使用者中的所有节点。 Kafka非常聪明,它可以连接到具有基于复制因子或分区所需数据的节点
You can pass all the nodes in the producer or consumer. Kafka is intelligent enough that it will connect to the node that has the data you required based on the replication factor or the partition
这里是使用者代码:
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
您可以找到更多信息此处
注意:此方法的问题是它将打开多个连接以找出哪个节点保存了数据。对于更健壮和可扩展的系统,您可以维护分区号和节点名称的映射,这也将有助于负载平衡。
Note: Problem with this approch is it will open multiple connection to find out the which node holds the data. For more robust and scalable systems you can maintain the map of partition number and node name , this will help in load balencing also.
这里是生产者样本
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
更多信息此处
这篇关于在Apache Kafka多节点集群中连接到Zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!