Apache Kafka Streams将KTables物化为一个主题似乎很慢 [英] Apache Kafka Streams Materializing KTables to a topic seems slow
问题描述
我正在使用kafka流,并且试图将KTable变成一个主题.
I'm using kafka stream and I'm trying to materialize a KTable into a topic.
它有效,但似乎每30秒左右完成一次.
It works but it seems to be done every 30 secs or so.
Kafka Stream如何/何时决定将KTable的当前状态具体化为主题?
How/When does Kafka Stream decides to materialize the current state of a KTable into a topic ?
有什么办法可以缩短时间,使其更实时"?
Is there any way to shorten this time and to make it more "real-time" ?
这是我正在使用的实际代码
Here is the actual code I'm using
// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream<Integer, Integer> kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);
// grouping by key
KGroupedStream<Integer, Integer> byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());
// same behaviour with or without the TimeWindow
KTable<Windowed<Integer>, Long> count = byKey.count(TimeWindows.of(1000L),"total");
// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
推荐答案
这由commit.interval.ms控制,默认值为30s.此处有更多详细信息: http://docs.confluent.io/current/streams/developer-guide.html
This is controlled by commit.interval.ms, which defaults to 30s. More details here: http://docs.confluent.io/current/streams/developer-guide.html
缓存的语义是,每当最早的commit.interval.ms或cache.max.bytes.buffering(缓存压力)命中时,数据便被刷新到状态存储并转发到下一个下游处理器节点.
The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits.
在这里:
这篇关于Apache Kafka Streams将KTables物化为一个主题似乎很慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!