Kafka Stream如何使用KTable#Suppress发送最终聚合? [英] How does Kafka Stream send final aggregation with KTable#Suppress?
本文介绍了Kafka Stream如何使用KTable#Suppress发送最终聚合?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想做的是这样
- 使用某个主题的记录
- 计算每个1秒窗口的值
- 检测记录为num<的窗口4
- 将FINAL结果发送到另一个主题
我使用抑制发送最终结果,但是出现了这样的错误.
I use suppress to send final result, but I got an error like this.
09:18:07,963 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager
- task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
.....
我认为我的代码与开发人员指南中的示例相同.有什么问题? 我的代码在这里.
I think my code is the same as example in developer guide. What's the problem? My code here.
final KStream<String, String> views = builder.stream("fluent-newData");
final KTable<Windowed<String>, Long> anomalousUsers = views
.map((key, value) -> {
JSONObject message = JSONObject.fromObject(value);
String[] strArry = message.getString("detail").split(",");
return KeyValue.pair(strArry[0], value);
})
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1))
.grace(Duration.ofSeconds(20)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 4);
final KStream<String, String> anomalousUsersForConsole = anomalousUsers
.toStream()
.filter((windowedUserId, count) -> count != null)
.map((windowedUserId, count) -> new KeyValue<>(windowedUserId.toString(), windowedUserId.toString() +" c:" + count.toString()));
anomalousUsersForConsole.to("demo-count-output", Produced.with(stringSerde, stringSerde));
推荐答案
窗口化不能转换为java.lang.String" 通常在没有直接指定Serdes的情况下抛出.
"Windowed cannot be cast to java.lang.String" usually thrown when you haven't specified serdes directly.
构建stream(..)
时,直接指定消耗的实例,如下所示:
when you building stream(..)
, specify directly Consumed instance like the following:
builder.stream("fluent-newData", Consumed.with(Serdes.String(), Serdes.String()))
对于groupByKey()
,您还需要通过Grouped
,如下所示:
also for groupByKey()
you need to pass Grouped
like the following:
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
这篇关于Kafka Stream如何使用KTable#Suppress发送最终聚合?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文