KafkaStreams:获取窗口最终结果 [英] KafkaStreams: Getting Window Final Results

查看:825
本文介绍了KafkaStreams:获取窗口最终结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否有可能获得在Kafka Streams中显示窗口最终结果.

我无法实现这个目标.我的代码有什么问题?

I can not achieve this goal. What is wrong with my code?

    val builder = StreamsBuilder()
    builder.stream<String,Double>(inputTopic)
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
            .count()
            .suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
            .toStream()
            .print(Printed.toSysOut())

它导致此错误:

Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: 
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String

代码/错误详细信息: https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380

Code / Error details: https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380

推荐答案

问题是一个令人困惑的不对称性,即Streams在开窗口期间自动包装显式serdes,但不自动包装默认serde.恕我直言,这是一个应予纠正的疏忽,所以我提起了: https://issues.apache.org/jira/browse/KAFKA-7806

The problem is a confusing asymmetry in the way that Streams automatically wraps explicit serdes during windowing, but does not automatically wrap the default serde. IMHO, this is an oversight that should be corrected, so I've filed: https://issues.apache.org/jira/browse/KAFKA-7806

正如其他人所指出的,解决方案是显式地将密钥Serde设置为上游,而不依赖于默认密钥Serde.您可以:

As others have noted, the solution is to explicitly set the key serde upstream and not rely on the default key serde. You can either:

使用Materialized

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
        .count(Materialized.with(Serdes.String(), Serdes.Long()))
        .suppress(Suppressed.untilWindowCloses(unbounded())))
        .toStream()
        .print(Printed.toSysOut())

(如Nishu推荐)

(请注意,count操作的名称不一定是 ,这具有使其可查询的副作用)

(note that it is not necessary to name the count operation, which has the side effect of making it queriable)

或将Serdes设置在更上游,例如在输入上:

Or set the serdes further upstream, for example on the input:

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
        .count()
        .suppress(Suppressed.untilWindowCloses(unbounded())))
        .toStream()
        .print(Printed.toSysOut())

(按照wardziniak的建议)

(as wardziniak recommended)

选择权由您决定;我认为在这种情况下,两种情况都没有太大区别.如果您执行的聚合与count不同,则可能还是要通过Materialized设置serde值,因此前者可能是更统一的样式.

The choice is yours; I think in this case it's not too much different in either case. If you were doing a different aggregation than count, you'd probably be setting the value serde via Materialized anyway, so maybe the former would be a more uniform style.

我还注意到您的窗口定义没有设置宽限期.窗口关闭时间定义为window end + grace period,默认值为24小时,因此在应用程序运行了24小时的数据之前,您不会看到抑制产生的任何东西.

I also noticed that your window definition doesn't have a grace period set. The window close time is defined as window end + grace period, and the default is 24 hours, so you wouldn't see anything emitted from the suppression until 24 hours' worth of data have run through the app.

为了您的测试努力,我建议您尝试:

For your testing effort, I'd recommend trying:

.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))

在生产中,您需要选择一个宽限期,以平衡流中预期的事件延迟量与希望从抑制中看到的排放提示量之间的平衡.

In production, you'll want to select a grace period that balances the amount of event lateness you expect in your stream with the amount of emission promptness you wish to see from the suppression.

最后一点,我注意到您尚未更改默认的缓存或提交间隔.结果,您会注意到count运算符本身将缓冲默认更新30秒钟,然后再将其传递给抑制.这是用于生产的良好配置,因此您不会对本地磁盘或Kafka代理造成瓶颈.但这可能会让您在测试时感到惊讶.

One final note, I noticed in your gist that you haven't changed the default caching or commit interval. As a result, you'll notice that the count operator itself will buffer updates for the default 30 seconds before passing them on to suppression. This is a good config for production so you don't create a bottleneck to your local disk or to the Kafka broker. But it might surprise you while you're testing.

通常用于测试(或以交互方式尝试东西),我将禁用缓存并设置提交间隔,以最大程度地提高开发人员的理智程度:

Typically for tests (or interactively trying stuff out), I'll disable caching and set the commit interval short for maximum developer sanity:

properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

对不起Serde的监督.希望我们能尽快解决KAFKA-7806的问题.

Sorry about the serde oversight. I hope we get KAFKA-7806 addressed soon.

我希望这会有所帮助!

这篇关于KafkaStreams:获取窗口最终结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆