Kafka Streams - 解释 KTable 及其关联的 Store 仅每 30 秒更新一次的原因 [英] Kafka Streams - Explain the reason why KTable and its associated Store only get updated every 30 seconds

查看:26
本文介绍了Kafka Streams - 解释 KTable 及其关联的 Store 仅每 30 秒更新一次的原因的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的 KTable 定义来生成一个 Store:

KTabletable = kStreamBuilder.table(ORDERS_TOPIC, ORDERS_STORE);table.print();

我将消息发布到 ORDERS_TOPIC 中,但存储直到每 30 秒才真正更新一次.这是因为 30000 毫秒时间已经过去而出现提交消息的日志:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] Oakcconsumer.internals.Fetcher :将分区 [orders-0] 的提取发送到代理 EXPRF026.SUMINISTRADOR:9092 (id: 0 机架: 空)2017-07-25 23:53:15.567 INFO 17540 --- [StreamThread-1] o.a.k.s.p.internals.StreamThread : 流线程 [StreamThread-1] 提交所有任务,因为提交间隔 30000 毫秒已经过去2017-07-25 23:53:15.567 INFO 17540 --- [StreamThread-1] o.a.k.s.p.internals.StreamThread:流线程 [StreamThread-1] 提交任务 StreamTask 0_02017 年 7 月 25 日 23:53:15.567 调试 17540 --- [StreamThread-1] o.a.k.s.processor.internals.StreamTask:任务 [0_0] 提交其状态2017 年 7 月 25 日 23:53:15.567 调试 17540 --- [StreamThread-1] o.a.k.s.p.i.ProcessorStateManager:任务 [0_0] 刷新状态管理器中注册的所有商店f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-6d5ec1,"name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)2017 年 7 月 25 日 23:53:15.569 调试 17540 --- [StreamThread-1] Oaksstate.internals.ThreadCache :线程顺序-服务-流-16941f70-87b3-45f4-88de-309e4fd-statsStream-Stream-cache8冲洗:#puts=1,#gets=1,#evicts=0,#flushes=12017-07-25 23:53:15.576 调试 17540 --- [StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl:任务 [0_0] 冲洗生产者

我发现控制这个的属性是commit.interval.ms:

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);

为什么默认设置为 30000 毫秒(听起来很长时间)以及将其更改为 10 毫秒的含义是什么?

如果我使用 KStream 而不是 KTable...

KStreamkStream = kStreamBuilder.stream(ORDERS_TOPIC);kStream.print();

...我可以立即看到消息,而不必等待那 30000 毫秒,为什么会有所不同?

解决方案

它特别与内存管理有关,KTable 缓存:http://docs.confluent.io/current/streams/developer-guide.html#memory-management

KTable 实际上一直在更新,如果您使用 "Interactive Queries" 访问底层状态存储,您可以立即获取每个更新.但是,KTable 缓存会缓冲更新以减少下游负载,并且每次触发提交时,都需要向下游刷新此缓存,以避免在失败时丢失数据.如果您的缓存大小很小,如果某个键被从缓存中逐出,您可能还会看到下游记录.

关于commit interval:一般来说,commit interval设置的比较大,以减少broker的commit负载.

I have this simple KTable definition that generates a Store:

KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();

I publish messages into the ORDERS_TOPIC but the store isn't really updated until every 30 seconds. This is the log where there is a message about committing because the 30000ms time has elapsed:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher       : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask   : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager        : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache      : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl  : task [0_0] Flushing producer

I found that the property that controls this is commit.interval.ms:

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);

Why is it set to 30000ms by default (sounds like a long time) and what are the implications of changing it to 10ms?

If instead of a KTable I work with a KStream...

KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();

...I can see the messages right away, without having to wait those 30000ms, why the difference?

解决方案

It's related to memory management in particular, the KTable caches: http://docs.confluent.io/current/streams/developer-guide.html#memory-management

KTable is actually updated all the time and if you use "Interactive Queries" to access the underlying state store, you can get each update immediately. However, the KTable cache buffers the updates to reduce downstream load and each time a commit is triggered, this cache needs to be flushed downstream to avoid data loss in case if failure. If your cache size is small, you might also see downstream records if a key get's evicted from the cache.

About commit interval: in general, the commit interval is set to a relatively large value, to reduce the commit load on the brokers.

这篇关于Kafka Streams - 解释 KTable 及其关联的 Store 仅每 30 秒更新一次的原因的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆