如何用Java创建Kafka ZKStringSerializer? [英] How create Kafka ZKStringSerializer in Java?
问题描述
在搜索如何通过API创建Kafka主题时,我在Scala中找到了这个示例:
In searching for how to create a Kafka topic through the API, I found this example in Scala:
import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,
connectionTimeoutMs, ZKStringSerializer)
// Create a topic with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName,
numPartitions, replicationFactor, topicConfig)
来源:https://stackoverflow.com/a/23360100/871012
最后一个arg ZKStringSerializer
显然是一个Scala对象。我不清楚如何使这个例子在Java中工作。
The last arg ZKStringSerializer
is apparently a Scala object. It is not clear to me how to make this example work in Java.
这篇文章如何在clojure中创建一个scala对象在Clojure中询问相同的问题,答案是:
This post How to create a scala object in clojure asks the same question in Clojure and the answer was:
ZKStringSerializer$/MODULE$
在Java中将(我认为)转换为:
which in Java would (I think) translate to:
ZKStringSerializer$.MODULE$
但当我尝试(或任何其他数量的变体)时,它们都没有编译。
编译错误是:
But when I try that (or any number of other variations) none of them compile.
The compilation error is:
KafkaTopicCreator.java:[16,18] cannot find symbol
symbol: variable ZKStringSerializer$
location: class org.sample.KafkaTopicCreator
我使用的是kafka_2.9.2-0.8.1.1和Java 8.
I am using kafka_2.9.2-0.8.1.1 and Java 8.
推荐答案
对于java,请尝试以下方法,
For java try the following,
首先导入以下语句
import kafka.utils.ZKStringSerializer$;
按以下方式为ZkClient创建对象,
Create object for ZkClient in the following way,
String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181"
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
上述代码对于kafka> 0.9不起作用,因为api已被更改,
使用以下代码为kafka> 0.9
The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}
这篇关于如何用Java创建Kafka ZKStringSerializer?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!