如何在 Java 中创建 Kafka ZKStringSerializer? [英] How create Kafka ZKStringSerializer in Java?

查看:41
本文介绍了如何在 Java 中创建 Kafka ZKStringSerializer?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在搜索如何通过API创建Kafka主题的过程中,我在Scala中找到了这个例子:

import kafka.admin.AdminUtils导入 kafka.utils.ZKStringSerializer导入 org.I0Itec.zkclient.ZkClient//创建一个 ZooKeeper 客户端val sessionTimeoutMs = 10000val connectionTimeoutMs = 10000val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,connectionTimeoutMs, ZKStringSerializer)//创建一个有 8 个分区、复制因子为 3 的主题val topicName = "myTopic"val numPartitions = 8val 复制因子 = 3val topicConfig = 新属性AdminUtils.createTopic(zkClient, topicName,numPartitions、replicationFactor、topicConfig)

来源:https://stackoverflow.com/a/23360100/871012

最后一个参数 ZKStringSerializer 显然是一个 Scala 对象.我不清楚如何使这个示例在 Java 中工作.

这篇文章如何在 clojure 中创建 scala 对象 在 Clojure 中问了同样的问题,答案是:

ZKStringSerializer$/MODULE$

在 Java 中(我认为)会转化为:

ZKStringSerializer$.MODULE$

但是当我尝试那个(或任何数量的其他变体)时,它们都没有编译.
编译错误是:

KafkaTopicCreator.java:[16,18] 找不到符号符号:变量 ZKStringSerializer$位置:类 org.sample.KafkaTopicCreator

我使用的是 kafka_2.9.2-0.8.1.1 和 Java 8.

解决方案

对于 java 尝试以下,

首先导入下面的语句

import kafka.utils.ZKStringSerializer$;

通过以下方式为ZkClient创建对象,

String zkHosts = "127.0.0.1:2181";//如果有多个zookeeper,则127.0.0.1:2181,127.0.0.2:2181"ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

<小时><块引用>

由于api已更改,因此上述代码不适用于kafka > 0.9,对 kafka > 0.9

使用以下代码

import java.util.Properties;导入 kafka.admin.AdminUtils;导入 kafka.utils.ZKStringSerializer$;导入 kafka.utils.ZkUtils;导入 org.I0Itec.zkclient.ZkClient;导入 org.I0Itec.zkclient.ZkConnection;公共类 KafkaTopicCreationInJava{public static void main(String[] args) 抛出异常 {zkClient zkClient = null;zkUtils zkUtils = null;尝试 {String zookeeperHosts = "192.168.20.1:2181";//如果有多个zookeeper,则->String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";int sessionTimeOutInMs = 15 * 1000;//15 秒int connectionTimeOutInMs = 10 * 1000;//10 秒zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);String topicName = "testTopic";int noOfPartitions = 2;int noOfReplication = 3;Properties topicConfiguration = new Properties();AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);} 捕捉(异常前){ex.printStackTrace();} 最后 {如果(zkClient != null){zkClient.close();}}}}

In searching for how to create a Kafka topic through the API, I found this example in Scala:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,
                            connectionTimeoutMs, ZKStringSerializer)

// Create a topic with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, 
                       numPartitions, replicationFactor, topicConfig)

Source: https://stackoverflow.com/a/23360100/871012

The last arg ZKStringSerializer is apparently a Scala object. It is not clear to me how to make this example work in Java.

This post How to create a scala object in clojure asks the same question in Clojure and the answer was:

ZKStringSerializer$/MODULE$

which in Java would (I think) translate to:

ZKStringSerializer$.MODULE$

But when I try that (or any number of other variations) none of them compile.
The compilation error is:

KafkaTopicCreator.java:[16,18] cannot find symbol
symbol:   variable ZKStringSerializer$
location: class org.sample.KafkaTopicCreator

I am using kafka_2.9.2-0.8.1.1 and Java 8.

解决方案

For java try the following,

First import below statement

import kafka.utils.ZKStringSerializer$;

Create object for ZkClient in the following way,

String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181"
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());


The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9

import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}

这篇关于如何在 Java 中创建 Kafka ZKStringSerializer?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆