根据部分数据属性更新KTable [英] Update KTable based on partial data attributes

查看:27
本文介绍了根据部分数据属性更新KTable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用对象的部分数据更新 KTable.例如.用户对象是{"id":1, "name":"Joe", "age":28}对象被流式传输到一个主题中,并按密钥分组到 KTable 中.现在用户对象部分更新如下 {"id":1, "age":33} 并流式传输到表中.但更新后的表看起来如下 {"id":1, "name":null, "age":28}.预期输出为 {"id":1, "name":"Joe", "age":33}.如何使用 Kafka 流和 Spring Cloud 流来实现预期的输出.任何建议,将不胜感激.谢谢.

I am trying to update a KTable with partial data of an object. Eg. User object is {"id":1, "name":"Joe", "age":28} The object is being streamed into a topic and grouped by key into KTable. Now the user object is updated partially as follows {"id":1, "age":33} and streamed into table. But the updated table looks as follows {"id":1, "name":null, "age":28}. The expected output is {"id":1, "name":"Joe", "age":33}. How can I use Kafka streams and spring cloud streams to achieve the expected output. Any suggestions would be appreciated. Thanks.

这是代码

 @Bean
        public Function<KStream<String, User>, KStream<String, User>> process() {
            return input -> input.map((key, user) -> new KeyValue<String, User>(user.getId(), user))
                    .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(User.class))).reduce((user1, user2) -> {
                        user1.merge(user2);
                        return user1;
                    }, Materialized.as("allusers")).toStream();
        }

并使用以下代码修改用户对象:

and modified the User object with below code:

    public void merge(Object newObject) {
        assert this.getClass().getName().equals(newObject.getClass().getName());
        for (Field field : this.getClass().getDeclaredFields()) {
            for (Field newField : newObject.getClass().getDeclaredFields()) {
                if (field.getName().equals(newField.getName())) {
                    try {
                        field.set(this, newField.get(newObject) == null ? field.get(this) : newField.get(newObject));
                    } catch (IllegalAccessException ignore) {
                    }
                }
            }
        }
    }

这是正确的方法还是 KStreams 中的任何其他方法?

Is this the right approach or any other approach in KStreams?

推荐答案

我已经测试了您的合并代码,它似乎按预期工作.但是由于您在 reduce 之后的结果是 {"id":1, "name":null, "age":28},我可以想到两件事:

I've tested your merge code, and it seems to be working as expected. But since your result after the reduce is {"id":1, "name":null, "age":28}, I can think of two things:

  • 您的状态根本没有更新,因为没有任何属性发生变化.
  • 也许你有序列化问题,因为 string 属性为 null,但其他 int 属性没问题.

我的猜测是,因为您正在改变原始对象并返回相同的值,所以 kafka 流不会将其检测为更改并且不会存储新状态.实际上,您不应该改变您的对象,因为它可能会导致不确定性,具体取决于您的管道.

My guess is that, because you are mutating the original object and return the same value, kafka streams doesn't detect that as a change and won't store the new state. Actually, you shouldn't mutate your object, since it could lead to non-determinism depending on your pipeline.

尝试更改您的 merge 函数以创建一个新的 User 对象,然后查看行为是否发生变化.

Try to change your merge function to create a new User object, and see if the behavior changes.

这篇关于根据部分数据属性更新KTable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆