如何使用 Kafka Stream DSL 通过处理器过滤键和值 [英] How to filter keys and value with a Processor using Kafka Stream DSL

查看:22
本文介绍了如何使用 Kafka Stream DSL 通过处理器过滤键和值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个处理器,它与 StateStore 交互以过滤消息并对消息执行复杂的逻辑.在 process(key,value) 方法中,我使用 context.forward(key,value) 发送我需要的键和值.出于调试目的,我也会打印这些内容.

I have a Processor that interact with a StateStore to filter and do complex logic on the messages. In the process(key,value) method I use context.forward(key,value) to send the keys and values that I need. For debugging purposes I also print those.

我有一个 KStream mergedStream,它是由两个其他流的连接产生的.我想将处理器应用于该流的记录.我用: mergedStream.process(myprocessor,"stateStoreName")

I have a KStream mergedStream that results from a join of two other streams. I want to apply the processor to the records of that stream. I achieve this with : mergedStream.process(myprocessor,"stateStoreName")

当我启动这个程序时,我可以看到要打印到我的控制台的正确值.但是,如果我使用 mergedStream.to("topic") 将合并流发送到主题,则该主题上的值不是我在处理器中转发的值,而是原始值.

When I start this program, I can see the proper values to be printed to my console. However if I send the mergedStream to a topic using mergedStream.to("topic") the values on the topic are not the one I have forwarded in the processor, but the original ones.

我使用 kafka-streams 0.10.1.0.

I use kafka-streams 0.10.1.0.

将我在处理器中转发到另一个流的值的最佳方法是什么?

What is the best way to get the values I have forwarded in the processor to another stream ?

是否可以混合使用 处理器 API 使用由 KStream 创建的流DSL?

Is it possible to mix the Processor API with the streams created by the KStream DSL?

推荐答案

简短:

为了解决您的问题,您可以使用 transform(...) 而不是 process(...) ,它也可以让您访问 DSL 中的处理器 API.

To solve your problem you can use transform(...) instead of process(...) which gives you access to Processor API within DSL, too.

:

如果你使用 process(...) 你将一个处理器应用到一个流——然而,这是一个终止"(或接收器)操作(它的返回类型是 void),即它不返回任何结果(这里的sink"仅表示操作符没有后继——并不意味着将任何结果写入某处!)

If you use process(...) you apply a processor to a stream -- however, this is a "terminating" (or sink) operation (its return type is void), i.e., it does not return any result (here "sink" does only mean that the operator has no successor -- it does not imply that any result is written somewhere!)

此外,如果你调用 mergedStream.process(...)mergedStream.to(...) 你基本上是分支和复制你的流并发送一份给每个下游操作符(即一份给process,一份给to.

Furthermore, if you call mergedStream.process(...) and mergedStream.to(...) you basically branch-and-duplicate your stream and send one copy to each downstream operator (ie, one copy to process and one copy to to.

混合 DSL 和处理器 API 是绝对可能的(你已经做到了;)).但是,使用 process(...) 你不能在 DSL 中使用 forward(...) 的数据——如果你想使用 Processor API 结果,你可以使用transform(...) 而不是 process(...).

Mixing DSL and Processor API is absolutely possible (you did it already ;)). However, using process(...) you cannot consumer data you forward(...) within DSL -- if you want to consume Processor API result, you can use transform(...) instead of process(...).

这篇关于如何使用 Kafka Stream DSL 通过处理器过滤键和值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆