Kafka Streams:如何写入主题? [英] Kafka Streams: how to write to a topic?

查看:20
本文介绍了Kafka Streams:如何写入主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Kafka Streams 中,生成/编写流的规范方式是什么?在 Spark 中,有一个自定义接收器,它可以作为来自任意数据源的长时间运行的适配器.Kafka Streams 中的等价物是什么?

In Kafka Streams, whats the canonical way of producing/writing a stream? In Spark, there is the custom receiver which works as a long running adapter from an arbitrary data source. What is the equivalent in Kafka Streams?

具体来说,我不是在问如何从一个主题转换到另一个主题.文档对此非常清楚.我想了解如何编写我的工作人员,这些工作人员将在一系列转换为 Kafka 的过程中进行第一次写入.

To be specific, I'm not asking how to do transforms from one topic to another. The documentation is very clear on that. I want to understand how to write my workers that will be doing the first write in a series of transforms into Kafka.

我希望能够做到

builder1.<something>(<some intake worker like a spark reciver)
       .to(topic1)
       .start()

builder2.from(topic1)
        .transform(<some transformation function>)
        .to(topic2)
        .start()

但是现有的文档都没有显示这一点?我错过了什么吗?

But none of the existing documentation shows this? Am I missing something?

推荐答案

取决于您使用的是 Kafka Streams DSL 还是 Processor API:

Depends on whether you are using the Kafka Streams DSL or Processor API:

  • Kafka Streams DSL 您可以使用 KStream#to()KStream 物化为主题.这是将数据具体化到主题的规范方式.或者,您可以使用 KStream#through().这也会将数据具体化到一个主题,但也会返回结果 KStream 以供进一步使用.#to()#through() 之间的唯一区别是,如果您想要将生成的物化分区作为 KStream.

  • Kafka Streams DSL You can use KStream#to() to materialize the KStream to a topic. This is the canonical way to materialize data to a topic. Alternatively, you can use KStream#through(). This will also materialize data to a topic, but also returns the resulting KStream for further use. The only difference between #to() and #through(), then, is that it saves you a KStreamBuilder#stream() if you want the resulting materialized partition as a KStream.

处理器 API 您可以通过将数据转发到接收器处理器来将数据具体化到分区.

Processor API You materialize data to a partition by forwarding the data to a sink processor.

无论哪种方式,需要注意的重要一点是,在您使用上述方法之一写入分区之前,数据不会具体化到主题中.map()filter() 等不具体化数据.数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现.

Either way, a crucial thing to note is that data is not materialized to a topic until you write to a partition using one of the methods mentioned above. map(), filter(), etc do not materialize data. The data remains in the processor task/thread/memory until it is materialized by one of the methods above.

生成到 Kafka Streams:

To produce into Kafka Streams:

Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());

然后:

Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))

您将需要:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${version.kafka}</version>
</dependency>

这篇关于Kafka Streams:如何写入主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆