Kafka比较一个键的连续值 [英] Kafka compare consecutive values for a key

查看:18
本文介绍了Kafka比较一个键的连续值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在构建一个应用程序来从传感器获取数据.数据被流式传输到 Kafka,从那里消费者将其发布到不同的数据存储.每个数据点将具有表示传感器状态的多个属性.

We are building an application to get data from sensors. The data is streamed to Kafka from where consumers will publish it to different data stores. Each data point will have multiple attributes representing the state of the sensor.

在其中一个消费者中,我们希望仅在值发生更改时才将数据发布到数据存储.例如如果有每 10 秒轮询一次数据的温度传感器,我们希望收到像

In one of the consumers we want to publish the data to the data store only if the value has changed. for e.g. if there is temperature sensor which is polled for data every 10 secs we expect to receive data like

----------------------------------------------------------------------
Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1            {timestamp: "10-10-2019 10:20:50", temperature: 11}

在上述情况下,只应发布第一条记录和第三条记录.

In the above case only the first record and the third record should be published.

为此,我们需要某种方法来比较某个键的当前值与具有相同键的先前值.我相信 KTable 或 KStream 应该可以做到这一点,但无法找到示例.

For this we need some way to compare the current value for a key with the previous value with the same key. I believe this should be possible with KTable or KStream but unable to find examples.

任何帮助都会很棒!

推荐答案

以下是如何使用 KStream#transformValues().

Here is an example how to solve this with KStream#transformValues().

StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.String(),
                                YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
    .transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
        private KeyValueStore<String, YourValueType> state;

        @Override
        public void init(final ProcessorContext context) {
            state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}

        @Override
        public YourValueType transform(final String key, final YourValueType value) {
            YourValueType prevValue = state.get(key);
            if (prevValue != null) {
                if (prevValue.temperature() != value.temperature()) {
                    return prevValue;
                }
            } else {
                state.put(key, value);
            }
            return null;
       }

       @Override
       public void close() {}
    }, stateStorName))
    .to(OUTPUT_TOPIC);

您将该记录与存储在状态存储中的先前记录进行比较.如果温度不同,则从状态存储返回记录并将当前记录存储在状态存储中.如果温度相等,则丢弃当前记录.

You compare the record with the previous record stored in the state store. If temperature is different you return the record from the state store and store the current record in the state store. If the temperature is equal you discard the current record.

这篇关于Kafka比较一个键的连续值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆