如何使用 Kafka Stream DSL 通过处理器过滤键和值 [英] How to filter keys and value with a Processor using Kafka Stream DSL
问题描述
我有一个处理器,它与 StateStore 交互以过滤消息并对消息执行复杂的逻辑.在 process(key,value)
方法中,我使用 context.forward(key,value)
发送我需要的键和值.出于调试目的,我也会打印这些内容.
I have a Processor that interact with a StateStore to filter and do complex logic on the messages. In the process(key,value)
method I use context.forward(key,value)
to send the keys and values that I need. For debugging purposes I also print those.
我有一个 KStream mergedStream
,它是由两个其他流的连接产生的.我想将处理器应用于该流的记录.我用: mergedStream.process(myprocessor,"stateStoreName")
I have a KStream mergedStream
that results from a join of two other streams. I want to apply the processor to the records of that stream. I achieve this with : mergedStream.process(myprocessor,"stateStoreName")
当我启动这个程序时,我可以看到要打印到我的控制台的正确值.但是,如果我使用 mergedStream.to("topic")
将合并流发送到主题,则该主题上的值不是我在处理器中转发的值,而是原始值.
When I start this program, I can see the proper values to be printed to my console. However if I send the mergedStream to a topic using mergedStream.to("topic")
the values on the topic are not the one I have forwarded in the processor, but the original ones.
我使用 kafka-streams 0.10.1.0.
I use kafka-streams 0.10.1.0.
将我在处理器中转发到另一个流的值的最佳方法是什么?
What is the best way to get the values I have forwarded in the processor to another stream ?
是否可以混合使用 处理器 API 使用由 KStream 创建的流DSL?
Is it possible to mix the Processor API with the streams created by the KStream DSL?
推荐答案
简短:
为了解决您的问题,您可以使用 transform(...)
而不是 process(...)
,它也可以让您访问 DSL 中的处理器 API.
To solve your problem you can use transform(...)
instead of process(...)
which gives you access to Processor API within DSL, too.
长:
如果你使用 process(...)
你将一个处理器应用到一个流——然而,这是一个终止"(或接收器)操作(它的返回类型是 void
),即它不返回任何结果(这里的sink"仅表示操作符没有后继——并不意味着将任何结果写入某处!)
If you use process(...)
you apply a processor to a stream -- however, this is a "terminating" (or sink) operation (its return type is void
), i.e., it does not return any result (here "sink" does only mean that the operator has no successor -- it does not imply that any result is written somewhere!)
此外,如果你调用 mergedStream.process(...)
和 mergedStream.to(...)
你基本上是分支和复制你的流并发送一份给每个下游操作符(即一份给process
,一份给to
.
Furthermore, if you call mergedStream.process(...)
and mergedStream.to(...)
you basically branch-and-duplicate your stream and send one copy to each downstream operator (ie, one copy to process
and one copy to to
.
混合 DSL 和处理器 API 是绝对可能的(你已经做到了;)).但是,使用 process(...)
你不能在 DSL 中使用 forward(...)
的数据——如果你想使用 Processor API 结果,你可以使用transform(...)
而不是 process(...)
.
Mixing DSL and Processor API is absolutely possible (you did it already ;)). However, using process(...)
you cannot consumer data you forward(...)
within DSL -- if you want to consume Processor API result, you can use transform(...)
instead of process(...)
.
这篇关于如何使用 Kafka Stream DSL 通过处理器过滤键和值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!