Kafka Streams:如何撰写主题? [英] Kafka Streams: how to write to a topic?

查看:52
本文介绍了Kafka Streams:如何撰写主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Kafka Streams中,产生/编写流的规范方法是什么?在Spark中,有一个自定义接收器,它可以作为来自任意数据源的长期运行的适配器. Kafka Streams中的等效功能是什么?

In Kafka Streams, whats the canonical way of producing/writing a stream? In Spark, there is the custom receiver which works as a long running adapter from an arbitrary data source. What is the equivalent in Kafka Streams?

具体来说,我不是在问如何将一个主题转换为另一个主题.该文档对此非常清楚.我想了解如何编写将在对Kafka进行的一系列转换中的第一次编写的工作人员.

To be specific, I'm not asking how to do transforms from one topic to another. The documentation is very clear on that. I want to understand how to write my workers that will be doing the first write in a series of transforms into Kafka.

我希望能够做到

builder1.<something>(<some intake worker like a spark reciver)
       .to(topic1)
       .start()

builder2.from(topic1)
        .transform(<some transformation function>)
        .to(topic2)
        .start()

但是现有文档中没有一个显示此信息吗?我想念什么吗?

But none of the existing documentation shows this? Am I missing something?

推荐答案

取决于您使用的是Kafka Streams DSL还是Processor API:

Depends on whether you are using the Kafka Streams DSL or Processor API:

  • Kafka Streams DSL 您可以使用KStream#to()KStream体现为主题.这是将数据具体化为主题的规范方法.或者,您可以使用KStream#through().这还将使数据具体化为一个主题,而且还返回结果KStream以供进一步使用.这样,#to()#through()之间的唯一区别是,如果您希望将生成的物化分区作为KStream,它将为您节省KStreamBuilder#stream().

  • Kafka Streams DSL You can use KStream#to() to materialize the KStream to a topic. This is the canonical way to materialize data to a topic. Alternatively, you can use KStream#through(). This will also materialize data to a topic, but also returns the resulting KStream for further use. The only difference between #to() and #through(), then, is that it saves you a KStreamBuilder#stream() if you want the resulting materialized partition as a KStream.

处理器API ,您可以通过将数据转发到接收器处理器来将数据实例化到分区.

Processor API You materialize data to a partition by forwarding the data to a sink processor.

无论哪种方式,要注意的关键一点是,只有使用上述方法之一将数据写入分区后,数据才会具体化为主题. map()filter()等不会实现数据.数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现为止.

Either way, a crucial thing to note is that data is not materialized to a topic until you write to a partition using one of the methods mentioned above. map(), filter(), etc do not materialize data. The data remains in the processor task/thread/memory until it is materialized by one of the methods above.

要制作成Kafka流:

To produce into Kafka Streams:

Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());

然后:

Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))

您将需要:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${version.kafka}</version>
</dependency>

这篇关于Kafka Streams:如何撰写主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆