kafka KStream - 采用n秒计数的拓扑 [英] kafka KStream - topology to take n-second counts

查看:921
本文介绍了kafka KStream - 采用n秒计数的拓扑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个JSON对象流,我正在键入几个值的哈希值。我希望在n秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。

I have a stream of JSON objects that Im keying on a hash of a few values. Im hoping to count by key in n-second (10? 60?) intervals and use these values to do some pattern analysis.

我的拓扑: K-> aggregateByKey(n秒) - > process()

进程中 - init()步骤Ive调用 ProcessorContent.schedule(60 * 1000L)希望调用 .punctuate()。从这里开始,我将遍历内部哈希中的值并相应地执行操作。

In the process - init() step Ive called ProcessorContent.schedule(60 * 1000L) in hopes of having the .punctuate() get called. From here I would loop through the values in an internal hash and act accordingly.

我看到值通过聚合步骤并点击进程( )功能,但 .punctuate()永远不会被调用。

Im seeing values come through the aggregation step and hit the process() function but the .punctuate() is never getting called.

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);

KStream<String, String> mapped = opxLines.map(new ReMapper());

KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
            new AggregateInit(),
            new OpxAggregate(),
            TimeWindows.of("opx_aggregate", 60000));

ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
                            @Override
                            public Processor<Windowed<String>, String> get() {
                                 return new AggProcessor();
                            }
                       });

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

AggregateInit()返回null。

我想我可以用简单的计时器做 .punctuate()等价物,但我想知道为什么这段代码无效我希望的方式。

I guess I can do the .punctuate() equivalent with a simple timer but I'd like to know why this code isn't working the way I'd hope.

推荐答案

我认为这与kafka集群的不正确设置有关。将文件描述符计数更改为比默认值(1024 - > 65535)高得多的值后,这似乎符合规范。

I think this is related to an improper setup for the kafka cluster. After changing the file descriptor count to a much higher value than the default (1024 -> 65535) this seems to be working per spec.

这篇关于kafka KStream - 采用n秒计数的拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆