Kafka Streams-解释为什么KTable及其关联存储仅每30秒更新一次的原因 [英] Kafka Streams - Explain the reason why KTable and its associated Store only get updated every 30 seconds

查看:219
本文介绍了Kafka Streams-解释为什么KTable及其关联存储仅每30秒更新一次的原因的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个生成商店的简单KTable定义:

I have this simple KTable definition that generates a Store:

KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();

我将消息发布到ORDERS_TOPIC中,但是直到每30秒才真正更新商店.这是一条日志,其中记录了有关提交的消息,因为已经过去了30000ms时间:

I publish messages into the ORDERS_TOPIC but the store isn't really updated until every 30 seconds. This is the log where there is a message about committing because the 30000ms time has elapsed:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher       : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask   : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager        : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache      : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl  : task [0_0] Flushing producer

我发现控制它的属性是commit.interval.ms:

I found that the property that controls this is commit.interval.ms:

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);

为什么默认情况下将其设置为30000ms(听起来很长一段时间),将其更改为10ms有什么含义?

如果我使用KStream而不是KTable ...

If instead of a KTable I work with a KStream...

KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();

...我可以立即看到消息,而不必等待这30000毫秒,为什么会有区别?

...I can see the messages right away, without having to wait those 30000ms, why the difference?

推荐答案

它特别与内存管理有关,与KTable缓存有关:

It's related to memory management in particular, the KTable caches: http://docs.confluent.io/current/streams/developer-guide.html#memory-management

KTable一直在更新,如果您使用交互式查询" 以访问基础状态存储,您可以立即获取每个更新.但是,KTable高速缓存会缓存更新以减少下游负载,并且每次触发提交时,都需要向下游刷新此高速缓存,以防止数据丢失,以防万一失败.如果您的缓存很小,那么如果从缓存中清除了密钥,您也可能会看到下游记录.

KTable is actually updated all the time and if you use "Interactive Queries" to access the underlying state store, you can get each update immediately. However, the KTable cache buffers the updates to reduce downstream load and each time a commit is triggered, this cache needs to be flushed downstream to avoid data loss in case if failure. If your cache size is small, you might also see downstream records if a key get's evicted from the cache.

关于提交间隔:通常,将提交间隔设置为一个较大的值,以减少代理上的提交负载.

About commit interval: in general, the commit interval is set to a relatively large value, to reduce the commit load on the brokers.

这篇关于Kafka Streams-解释为什么KTable及其关联存储仅每30秒更新一次的原因的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆