Kafka比较键的连续值 [英] Kafka compare consecutive values for a key

查看:60
本文介绍了Kafka比较键的连续值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在构建一个应用程序以从传感器获取数据.数据将流式传输到Kafka,消费者将从那里将其发布到不同的数据存储.每个数据点将具有代表传感器状态的多个属性.

We are building an application to get data from sensors. The data is streamed to Kafka from where consumers will publish it to different data stores. Each data point will have multiple attributes representing the state of the sensor.

在其中一个使用者中,我们仅想将值更改后才将数据发布到数据存储中.例如如果有温度传感器每10秒轮询一次数据,我们希望收到类似

In one of the consumers we want to publish the data to the data store only if the value has changed. for e.g. if there is temperature sensor which is polled for data every 10 secs we expect to receive data like

----------------------------------------------------------------------
Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:50", temperature: 11}

在上述情况下,仅应发布第一条记录和第三条记录.

In the above case only the first record and the third record should be published.

为此,我们需要某种方式来将键的当前值与具有相同键的先前值进行比较.我相信使用KTable或KStream应该可以实现,但无法找到示例.

For this we need some way to compare the current value for a key with the previous value with the same key. I believe this should be possible with KTable or KStream but unable to find examples.

任何帮助都会很棒!

推荐答案

以下是使用KStream#transformValues()解决此问题的示例.

Here is an example how to solve this with KStream#transformValues().

StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.String(),
                                YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
    .transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
        private KeyValueStore<String, YourValueType> state;

        @Override
        public void init(final ProcessorContext context) {
            state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}

        @Override
        public YourValueType transform(final String key, final YourValueType value) {
            YourValueType prevValue = state.get(key);
            if (prevValue != null) {
                if (prevValue.temperature() != value.temperature()) {
                    return prevValue;
                }
            } else {
                state.put(key, value);
            }
            return null;
       }

       @Override
       public void close() {}
    }, stateStorName))
    .to(OUTPUT_TOPIC);

您将记录与状态存储中存储的先前记录进行比较.如果温度不同,则从状态存储返回记录,并将当前记录存储在状态存储中.如果温度相等,则丢弃当前记录.

You compare the record with the previous record stored in the state store. If temperature is different you return the record from the state store and store the current record in the state store. If the temperature is equal you discard the current record.

这篇关于Kafka比较键的连续值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆