Kafka Streams:如何撰写主题? [英] Kafka Streams: how to write to a topic?
问题描述
在Kafka Streams中,产生/编写流的规范方法是什么?在Spark中,有一个自定义接收器,它可以作为来自任意数据源的长期运行的适配器. Kafka Streams中的等效功能是什么?
In Kafka Streams, whats the canonical way of producing/writing a stream? In Spark, there is the custom receiver which works as a long running adapter from an arbitrary data source. What is the equivalent in Kafka Streams?
具体来说,我不是在问如何将一个主题转换为另一个主题.该文档对此非常清楚.我想了解如何编写将在对Kafka进行的一系列转换中的第一次编写的工作人员.
To be specific, I'm not asking how to do transforms from one topic to another. The documentation is very clear on that. I want to understand how to write my workers that will be doing the first write in a series of transforms into Kafka.
我希望能够做到
builder1.<something>(<some intake worker like a spark reciver)
.to(topic1)
.start()
builder2.from(topic1)
.transform(<some transformation function>)
.to(topic2)
.start()
但是现有文档中没有一个显示此信息吗?我想念什么吗?
But none of the existing documentation shows this? Am I missing something?
推荐答案
取决于您使用的是Kafka Streams DSL还是Processor API:
Depends on whether you are using the Kafka Streams DSL or Processor API:
-
Kafka Streams DSL 您可以使用
KStream#to()
将KStream
体现为主题.这是将数据具体化为主题的规范方法.或者,您可以使用KStream#through()
.这还将使数据具体化为一个主题,而且还返回结果KStream
以供进一步使用.这样,#to()
和#through()
之间的唯一区别是,如果您希望将生成的物化分区作为KStream
,它将为您节省KStreamBuilder#stream()
.
Kafka Streams DSL You can use
KStream#to()
to materialize theKStream
to a topic. This is the canonical way to materialize data to a topic. Alternatively, you can useKStream#through()
. This will also materialize data to a topic, but also returns the resultingKStream
for further use. The only difference between#to()
and#through()
, then, is that it saves you aKStreamBuilder#stream()
if you want the resulting materialized partition as aKStream
.
处理器API ,您可以通过将数据转发到接收器处理器来将数据实例化到分区.
Processor API You materialize data to a partition by forwarding the data to a sink processor.
无论哪种方式,要注意的关键一点是,只有使用上述方法之一将数据写入分区后,数据才会具体化为主题. map()
,filter()
等不会实现数据.数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现为止.
Either way, a crucial thing to note is that data is not materialized to a topic until you write to a partition using one of the methods mentioned above. map()
, filter()
, etc do not materialize data. The data remains in the processor task/thread/memory until it is materialized by one of the methods above.
要制作成Kafka流:
To produce into Kafka Streams:
Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());
然后:
Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))
您将需要:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
这篇关于Kafka Streams:如何撰写主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!