kafka KStream - 采用n秒计数的拓扑 [英] kafka KStream - topology to take n-second counts
问题描述
我有一个JSON对象流,我正在键入几个值的哈希值。我希望在n秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。
I have a stream of JSON objects that Im keying on a hash of a few values. Im hoping to count by key in n-second (10? 60?) intervals and use these values to do some pattern analysis.
我的拓扑: K-> aggregateByKey(n秒) - > process()
在进程中 - init()
步骤Ive调用 ProcessorContent.schedule(60 * 1000L)
希望调用 .punctuate()
。从这里开始,我将遍历内部哈希中的值并相应地执行操作。
In the process - init()
step Ive called ProcessorContent.schedule(60 * 1000L)
in hopes of having the .punctuate()
get called. From here I would loop through the values in an internal hash and act accordingly.
我看到值通过聚合步骤并点击进程( )
功能,但 .punctuate()
永远不会被调用。
Im seeing values come through the aggregation step and hit the process()
function but the .punctuate()
is never getting called.
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);
KStream<String, String> mapped = opxLines.map(new ReMapper());
KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));
ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
AggregateInit()返回null。
我想我可以用简单的计时器做 .punctuate()
等价物,但我想知道为什么这段代码无效我希望的方式。
I guess I can do the .punctuate()
equivalent with a simple timer but I'd like to know why this code isn't working the way I'd hope.
推荐答案
我认为这与kafka集群的不正确设置有关。将文件描述符计数更改为比默认值(1024 - > 65535)高得多的值后,这似乎符合规范。
I think this is related to an improper setup for the kafka cluster. After changing the file descriptor count to a much higher value than the default (1024 -> 65535) this seems to be working per spec.
这篇关于kafka KStream - 采用n秒计数的拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!