如何通过Java在Kafka中创建主题 [英] How to create a Topic in Kafka through Java

查看:2916
本文介绍了如何通过Java在Kafka中创建主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它工作正常。但我想通过java api创建一个主题。经过长时间的搜索,我发现下面的代码,

I want to create a topic in Kafka (kafka_2.8.0-0.8.1.1) through java. It is working fine if I create a topic in command prompt, and If I push message through java api. But I want to create a topic through java api. After a long search I found below code,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

我尝试了上面的代码,它显示主题已创建,但我无法推送消息话题。我的代码有什么问题吗?或者以其他任何方式实现上述目标?

I tried above code and it is showing that topic is created but I am not able to push message in the topic. Any thing wrong in my code? Or any other way to achieve the above?

推荐答案

我修好了..经过长时间的研究..

I fixed it.. After a long research..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

从上面的代码中,ZkClient将创建一个主题,但此主题信息将无法识别kafka 。所以我们要做的是,我们需要以下列方式为ZkClient创建对象,

From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,

首先导入以下语句,

import kafka.utils.ZKStringSerializer$;

并按以下方式为ZkClient创建对象,

and create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());






编辑1 :(对于@ajkret评论)




Edit 1: (for @ajkret comment)


上面的代码不能用于kafka> 0.9,因为api已被更改,
使用以下代码为kafka> 0.9

The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9







import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}

这篇关于如何通过Java在Kafka中创建主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆