使用Java为Apache Kafka 0.9创建主题 [英] Creating a topic for Apache Kafka 0.9 Using Java

查看:73
本文介绍了使用Java为Apache Kafka 0.9创建主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个客户端来使用kafka 0.9。我想知道如何创建一个主题。答案:如何在卡夫卡创建主题通过Java 类似于我的要求。除此之外,该解决方案仅适用于Kafka 0.8.2,这与Kafka 0.9的API有很大不同。

I am programming a client to work with kafka 0.9. I want to know how to create a topic. This answer: How to create a Topic in Kafka through Java is similar to what I am asking. Except, that solution only works for Kafka 0.8.2 which is hugely different from Kafka 0.9's API.

推荐答案

在线查看scala api和各种链接后。

After looking through the scala api and various links online.

这是我找到的解决方案:

This is the solution I found:

Maven Dependencies:

Maven Dependencies:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

代码:

import java.util.Properties;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

    public static void main(String[] args) {
        String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
        int sessionTimeoutMs = 10 * 1000;
        int connectionTimeoutMs = 8 * 1000;

        ZkClient zkClient = new ZkClient(
            zookeeperConnect,
            sessionTimeoutMs,
            connectionTimeoutMs,
            ZKStringSerializer$.MODULE$);

       // Security for Kafka was added in Kafka 0.9.0.0
       boolean isSecureKafkaCluster = false;
       // ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API
       ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

       String topic = "my-topic";
       int partitions = 2;
       int replication = 3;

       // Add topic configuration here
       Properties topicConfig = new Properties();

       AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
       zkClient.close();
    }
}

如果您想知道为什么下面的代码没有看起来像Java:

If you are wondering why the code below doesn't look like Java:

ZKStringSerializer$.MODULE$

这是因为ZkStringSerializer是一个Scala对象。
您可以在这里阅读更多相关信息:

It is because ZkStringSerializer is a Scala Object. You can read more information about that here:

如何在Java中创建Kafka ZKStringSerializer?

注意:您必须使用ZKStringSerializer初始化ZkClient。

如果你不这样做,那么createTopic()似乎只能工作
(换句话说:它将返回而不会出错)。

该主题仅存在于Zookeeper中,仅在以下情况下有效列出主题。
即下面的列表命令工作正常

Note: You must initialize the ZkClient with ZKStringSerializer.
If you don't, then createTopic() will only seem to work (In other words: it will return without error).
The topic will exist in only Zookeeper and only works when listing topics. i.e. list command below works fine

bin/kafka-topics.sh --list --zookeeper localhost:2181

但Kafka本身并未创建该主题。
为了说明,下面的describe命令将引发错误。

but Kafka itself does not create the topic. To illustrate, the describe command below will throw an error.

bin/kafka-topics.sh --describe --zookeeper localhost:2181

因此,请确保使用ZKStringSerializer $ .MODULE $初始化它。

Therefore, make sure you initialize it with ZKStringSerializer$.MODULE$.

参考文献:
我们如何从IDE在Kafka中创建主题使用API​​ 来自-ide-using-api

References: How Can we create a topic in Kafka from the IDE using API‌​from-the-ide-using-api

很快Chee Loong,
多伦多大学

Soon Chee Loong, University of Toronto

这篇关于使用Java为Apache Kafka 0.9创建主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆