从Java创建`KafkaServer` [英] create `KafkaServer` from Java
问题描述
我正在尝试从Java启动Kafka服务器
I am trying to start a Kafka server form Java
具体来说,我该如何翻译将Scala的这一行转换为Java的一行?
Specifically, how can I translate this line of Scala into a line of Java?
private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
我可以轻松创建serverConfig,但似乎无法创建 kafkaMetricsReporters
参数.
I can create the serverConfig easily, but I can't seem to be able to create the kafkaMetricsReporters
parameter.
注意:我可以创建一个 KafkaServerStartable
,但是我想创建一个普通的 KafkaServer
,以避免在发生错误时JVM退出.
Note: I can create a KafkaServerStartable
but I would like to create a normal KafkaServer
to avoid the JVM exiting in case of error.
Apache Kafka版本0.11.0.1
Apache Kafka version 0.11.0.1
推荐答案
kafkaMetricsReporters
参数是scala Seq
.
The kafkaMetricsReporters
parameter is a scala Seq
.
您可以:
-
创建一个Java集合并将其转换为Seq:
Create a Java collection and convert it into a Seq:
您需要导入 scala.collection.JavaConverters
:
List<KafkaMetricsReporter> reportersList = new ArrayList<>();
...
Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala();
使用 KafkaMetricsReporter.startReporters()
方法从您的配置中为您创建它们:
Use KafkaMetricsReporter.startReporters()
method to create them for you from your configuration:
由于 KafkaMetricsReporter
是单例,因此您需要使用 MODULE
表示法来使用它:
As KafkaMetricsReporter
is a singleton, you need to use the MODULE
notation to use it:
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
KafkaServer
构造函数还有另外2个从Java调用时需要的自变量:
Also the KafkaServer
constructor has 2 other arguments that are required when calling it from Java:
- 可以使用
-
时间
-
threadNamePrefix
是一个选项.如果导入scala.Option
,则可以调用Option.apply("prefix")
new org.apache.kafka.common.utils.SystemTime()
轻松创建time
can easily be created usingnew org.apache.kafka.common.utils.SystemTime()
threadNamePrefix
is an Option. If you importscala.Option
, you'll be able to callOption.apply("prefix")
将它们放在一起:
Properties props = new Properties();
props.put(...);
KafkaConfig config = KafkaConfig.fromProps(props);
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props));
KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters);
server.startup();
这篇关于从Java创建`KafkaServer`的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!