Kafka Streams App-计数和总计 [英] Kafka Streams App - count and sum aggregate

查看:63
本文介绍了Kafka Streams App-计数和总计的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从KGroupedStream创建一个KTable,以存储每个键的值之和.

I'm trying to create a KTable from a KGroupedStream to store the sum of the value for each key.

 final StreamsBuilder builder = new StreamsBuilder();
 final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Materialized.<String, Long, KeyValueStore<Byte, byte[]>>as("counts-store"));

但出现错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Materialized<String,Long,KeyValueStore<Byte,byte[]>>)

我看到的所有示例都将Serde作为第三个参数传递,但是我已经尝试过并得到了非常类似的错误(我认为这可能来自较旧的版本,因为它与当前版本的签名不匹配实施?):

All the examples I've seen pass in a Serde as the third argument but I have tried this and get a very similar error (I think this might be from an older version as it does not match with the signature of the current implementation?):

final StreamsBuilder builder = new StreamsBuilder();
    final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Serdes.Long());

错误:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Serde<Long>)

我在做什么错了?

使用Kafka版本:2.1.0

Using Kafka version: 2.1.0

推荐答案

您的代码中几乎没有问题:

There are few issues in your code:

  1. 对于 Materialized.as 而不是 java.lang.Byte ,您应该传递 org.apache.kafka.common.utils.Bytes
  2. 您不应修改 final 变量: aggregate + = value;
  3. 您必须将键和值的类型添加到 StreamsBuilder :: stream 调用( builder.< String,Long> stream("streams-plaintext-input"))
  1. For Materialized.as instead java.lang.Byte you should pass org.apache.kafka.common.utils.Bytes
  2. You shouldn't modify final variable: aggregate += value;
  3. You have to add types of key and value to StreamsBuilder::stream call (builder.<String, Long>stream("streams-plaintext-input"))

修改后,其外观应大致如下:

After modification it should looks more or less as follow:

KTable<String, Long> sum = builder.<String, Long>stream("streams-plaintext-input")
        .groupByKey()
        .aggregate(new Initializer<Long>() {
            @Override
            public Long apply() {
                return Long.MIN_VALUE;
            }
        }, new Aggregator<String, Long, Long>() {
            @Override
            public Long apply(final String key, final Long value,final Long aggregate) {
                return aggregate + value;
            }
        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));

这篇关于Kafka Streams App-计数和总计的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆