连接到 Apache Kafka 多节点集群中的 Zookeeper [英] Connecting to Zookeeper in a Apache Kafka Multi Node cluster
问题描述
我按照以下说明设置了多节点 kafka 集群.现在,如何连接到zookeeper?可以在 JAVA 中从生产者/消费者端连接到一个动物园管理员还是有办法连接所有动物园管理员节点?
设置多节点Apache ZooKeeper集群
在集群的每个节点上,将以下几行添加到文件 kafka/config/zookeeper.properties
server.1=zNode01:2888:3888server.2=zNode02:2888:3888server.3=zNode03:2888:3888#如果需要,请在此处添加更多服务器初始限制=5同步限制=2
在集群的每个节点上,在 dataDir 属性表示的文件夹中创建一个名为 myid 的文件(默认文件夹为/tmp/zookeeper ).myid 文件应该只包含 znode 的 id('1' 代表 zNode01,'2' 代表 ZNode02,等等)
设置多代理 Apache Kafka 集群
在集群的每个节点上修改kafka/config/server.properties文件中的zookeeper.connect属性:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
在集群的每个节点上修改 kafka/config/server.properties 文件中的属性 host.name:主机名=zNode0x
在集群的每个节点上修改文件 kafka/config/server.properties 中的属性 broker.id(集群中的每个代理都应该有一个唯一的 id)
可以传递生产者或消费者中的所有节点.Kafka 足够智能,它会根据复制因子或分区连接到具有您所需数据的节点
这是消费者代码:
Properties props = new Properties();props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer消费者 = 新的 KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));而(真){ConsumerRecords记录 = 消费者.poll(100);for (ConsumerRecord record : 记录)System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
您可以找到更多信息 这里
注意:这个方法的问题是它会打开多个连接来找出哪个节点保存数据.对于更健壮和可扩展的系统,您可以维护分区号和节点名称的映射,这也将有助于负载平衡.
这里是生产者样本
Properties props = new Properties();props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");props.put("acks", "all");props.put("重试", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");生产者<字符串,字符串>生产者 = 新的 KafkaProducer<>(props);for(int i = 0; i <100; i++)producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));生产者.close();
更多信息这里>
I followed the following instructions to set up a multi node kafka cluster. Now, how to connect to the zookeeper ? Is it okay to connect to just one zookeeper from the Producer/consumer side in JAVA or is there a way to connect all the zookeeper nodes ?
Setting a multi node Apache ZooKeeper cluster
On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties
server.1=zNode01:2888:3888
server.2=zNode02:2888:3888
server.3=zNode03:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (‘1’ for zNode01, ‘2’ for ZNode02, etc… )
Setting a multi broker Apache Kafka cluster
On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
On every node of the cluster modify the property host.name from the file kafka/config/server.properties: host.name=zNode0x
On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)
You can pass all the nodes in the producer or consumer. Kafka is intelligent enough that it will connect to the node that has the data you required based on the replication factor or the partition
Here is the consumer code :
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
You can find more info here
Note: Problem with this approch is it will open multiple connection to find out the which node holds the data. For more robust and scalable systems you can maintain the map of partition number and node name , this will help in load balencing also.
Here is the producer sample
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
more info here
这篇关于连接到 Apache Kafka 多节点集群中的 Zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!