Kafka Streams:如何写入主题? [英] Kafka Streams: how to write to a topic?
问题描述
在 Kafka Streams 中,生成/编写流的规范方式是什么?在 Spark 中,有一个自定义接收器,它可以作为来自任意数据源的长时间运行的适配器.Kafka Streams 中的等价物是什么?
In Kafka Streams, whats the canonical way of producing/writing a stream? In Spark, there is the custom receiver which works as a long running adapter from an arbitrary data source. What is the equivalent in Kafka Streams?
具体来说,我不是在问如何从一个主题转换到另一个主题.文档对此非常清楚.我想了解如何编写我的工作人员,这些工作人员将在一系列转换为 Kafka 的过程中进行第一次写入.
To be specific, I'm not asking how to do transforms from one topic to another. The documentation is very clear on that. I want to understand how to write my workers that will be doing the first write in a series of transforms into Kafka.
我希望能够做到
builder1.<something>(<some intake worker like a spark reciver)
.to(topic1)
.start()
builder2.from(topic1)
.transform(<some transformation function>)
.to(topic2)
.start()
但是现有的文档都没有显示这一点?我错过了什么吗?
But none of the existing documentation shows this? Am I missing something?
推荐答案
取决于您使用的是 Kafka Streams DSL 还是 Processor API:
Depends on whether you are using the Kafka Streams DSL or Processor API:
Kafka Streams DSL 您可以使用
KStream#to()
将KStream
物化为主题.这是将数据具体化到主题的规范方式.或者,您可以使用KStream#through()
.这也会将数据具体化到一个主题,但也会返回结果KStream
以供进一步使用.#to()
和#through()
之间的唯一区别是,如果您想要将生成的物化分区作为KStream
.
Kafka Streams DSL You can use
KStream#to()
to materialize theKStream
to a topic. This is the canonical way to materialize data to a topic. Alternatively, you can useKStream#through()
. This will also materialize data to a topic, but also returns the resultingKStream
for further use. The only difference between#to()
and#through()
, then, is that it saves you aKStreamBuilder#stream()
if you want the resulting materialized partition as aKStream
.
处理器 API 您可以通过将数据转发到接收器处理器来将数据具体化到分区.
Processor API You materialize data to a partition by forwarding the data to a sink processor.
无论哪种方式,需要注意的重要一点是,在您使用上述方法之一写入分区之前,数据不会具体化到主题中.map()
、filter()
等不具体化数据.数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现.
Either way, a crucial thing to note is that data is not materialized to a topic until you write to a partition using one of the methods mentioned above. map()
, filter()
, etc do not materialize data. The data remains in the processor task/thread/memory until it is materialized by one of the methods above.
生成到 Kafka Streams:
To produce into Kafka Streams:
Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());
然后:
Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))
您将需要:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
这篇关于Kafka Streams:如何写入主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!