Kafka Streams DSL to Processor API有什么区别 [英] What is the difference between Kafka Streams DSL to Processor API

查看:47
本文介绍了Kafka Streams DSL to Processor API有什么区别的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据我从这里读到的内容: https://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-processor-api

我理解的区别在于,我可以在Processor API中更具体地说明哪些记录序列化程序进入流(源)到退出流(接收器)的记录,以及流DSL中的记录必须相同./p>

是否可以使用KStreamBuilder和TopologyBuilder创建一个Kstream,从而产生完全相同的流?如果可以,我可以看个例子吗?

解决方案

...并且在流DSL中它必须相同.

我认为那是不对的.您可以从输入流中获取消息,更改值甚至键,然后将它们放在其他输出流中.

 最终KStream< String,String>inputStream = builder.stream("inputStream");...inputStream.filter(this :: acceptCertainMessages).transform(new MyTransformerSupplier< String,String>("store"),"store").to("outputStream"); 

from what i read from here : https://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-processor-api

The difference to my understanding is that i can be more specific in the Processor API as to what record serializer enters the stream (source) to what exits the stream (sink) and in the stream DSL it must be the same.

is it possible to create a Kstream with KStreamBuilder and with TopologyBuilder that will result in the exact same stream? if so can i see an example?

解决方案

... and in the stream DSL it must be the same.

I don't think that is right. You can take messages from an input stream, change the value or even the key, and put them on a different output stream.

final KStream<String, String> inputStream = builder.stream("inputStream");
...
inputStream
    .filter(this::acceptCertainMessages)
    .transform(new MyTransformerSupplier<String, String>("store"), "store")
    .to("outputStream");

这篇关于Kafka Streams DSL to Processor API有什么区别的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆