从Java创建`KafkaServer` [英] create `KafkaServer` from Java

查看:80
本文介绍了从Java创建`KafkaServer`的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从Java启动Kafka服务器

I am trying to start a Kafka server form Java

具体来说,我该如何翻译将Scala的这一行转换为Java的一行?

Specifically, how can I translate this line of Scala into a line of Java?

  private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)

我可以轻松创建serverConfig,但似乎无法创建 kafkaMetricsReporters 参数.

I can create the serverConfig easily, but I can't seem to be able to create the kafkaMetricsReporters parameter.

注意:我可以创建一个 KafkaServerStartable ,但是我想创建一个普通的 KafkaServer ,以避免在发生错误时JVM退出.

Note: I can create a KafkaServerStartable but I would like to create a normal KafkaServer to avoid the JVM exiting in case of error.

Apache Kafka版本0.11.0.1

Apache Kafka version 0.11.0.1

推荐答案

kafkaMetricsReporters 参数是scala Seq .

The kafkaMetricsReporters parameter is a scala Seq.

您可以:

  1. 创建一个Java集合并将其转换为Seq:

  1. Create a Java collection and convert it into a Seq:

您需要导入 scala.collection.JavaConverters :

List<KafkaMetricsReporter> reportersList = new ArrayList<>();
...
Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala();

  • 使用 KafkaMetricsReporter.startReporters()方法从您的配置中为您创建它们:

  • Use KafkaMetricsReporter.startReporters() method to create them for you from your configuration:

    由于 KafkaMetricsReporter 是单例,因此您需要使用 MODULE 表示法来使用它:

    As KafkaMetricsReporter is a singleton, you need to use the MODULE notation to use it:

    Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
    

  • KafkaServer 构造函数还有另外2个从Java调用时需要的自变量:

    Also the KafkaServer constructor has 2 other arguments that are required when calling it from Java:

      可以使用 new org.apache.kafka.common.utils.SystemTime() 轻松创建
    • 时间
    • threadNamePrefix 是一个选项.如果导入 scala.Option ,则可以调用 Option.apply("prefix")
    • time can easily be created using new org.apache.kafka.common.utils.SystemTime()
    • threadNamePrefix is an Option. If you import scala.Option, you'll be able to call Option.apply("prefix")

    将它们放在一起:

    Properties props = new Properties();
    props.put(...);
    KafkaConfig config = KafkaConfig.fromProps(props);
    Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
    KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters);
    server.startup();
    

    这篇关于从Java创建`KafkaServer`的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆