根据部分数据属性更新KTable [英] Update KTable based on partial data attributes
问题描述
我正在尝试使用对象的部分数据更新KTable.
例如.用户对象是
{"id":1, "name":"Joe", "age":28}
该对象将流式传输到主题中,并按关键字分组到KTable中.
现在,用户对象按{"id":1, "age":33}
部分更新,并流式传输到表中.但是更新的表如下所示{"id":1, "name":null, "age":28}
.
预期的输出是{"id":1, "name":"Joe", "age":33}
.
如何使用Kafka流和Spring Cloud流来达到预期的输出.任何建议,将不胜感激.谢谢.
I am trying to update a KTable with partial data of an object.
Eg. User object is
{"id":1, "name":"Joe", "age":28}
The object is being streamed into a topic and grouped by key into KTable.
Now the user object is updated partially as follows {"id":1, "age":33}
and streamed into table. But the updated table looks as follows {"id":1, "name":null, "age":28}
.
The expected output is {"id":1, "name":"Joe", "age":33}
.
How can I use Kafka streams and spring cloud streams to achieve the expected output. Any suggestions would be appreciated. Thanks.
这是代码
@Bean
public Function<KStream<String, User>, KStream<String, User>> process() {
return input -> input.map((key, user) -> new KeyValue<String, User>(user.getId(), user))
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(User.class))).reduce((user1, user2) -> {
user1.merge(user2);
return user1;
}, Materialized.as("allusers")).toStream();
}
并使用以下代码修改了User对象:
and modified the User object with below code:
public void merge(Object newObject) {
assert this.getClass().getName().equals(newObject.getClass().getName());
for (Field field : this.getClass().getDeclaredFields()) {
for (Field newField : newObject.getClass().getDeclaredFields()) {
if (field.getName().equals(newField.getName())) {
try {
field.set(this, newField.get(newObject) == null ? field.get(this) : newField.get(newObject));
} catch (IllegalAccessException ignore) {
}
}
}
}
}
这是正确的方法还是KStreams中的任何其他方法?
Is this the right approach or any other approach in KStreams?
推荐答案
我已经测试了您的合并代码,它似乎按预期工作.但是由于reduce
之后的结果是{"id":1, "name":null, "age":28}
,所以我可以想到两件事:
I've tested your merge code, and it seems to be working as expected. But since your result after the reduce
is {"id":1, "name":null, "age":28}
, I can think of two things:
- 您的状态根本没有更新,因为没有任何属性更改.
- 也许您有一个序列化问题,因为string属性为null,但其他int属性都可以.
我的猜测是,因为您正在突变原始对象并返回相同的值,所以kafka流不会将其检测为更改,也不会存储新状态.实际上,您不应该突变您的对象,因为它可能导致不确定性,具体取决于您的管道.
My guess is that, because you are mutating the original object and return the same value, kafka streams doesn't detect that as a change and won't store the new state. Actually, you shouldn't mutate your object, since it could lead to non-determinism depending on your pipeline.
尝试更改您的merge
函数以创建一个新的User
对象,并查看行为是否发生变化.
Try to change your merge
function to create a new User
object, and see if the behavior changes.
这篇关于根据部分数据属性更新KTable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!