Spring Batch:一个 Reader,复合处理器(两个具有不同实体的类)和两个 kafkaItemWriter [英] Spring Batch : One Reader, composite processor (two classes with different entities) and two kafkaItemWriter
问题描述
ItemReader
正在从 DB2 读取数据并提供 java 对象 ClaimDto
.现在ClaimProcessor
接收ClaimDto
的对象并返回CompositeClaimRecord
对象,该对象由claimRecord1
和claimRecord2 组成
发送到两个不同的 Kafka 主题.如何将claimRecord1
和claimRecord2
分别写入topic1和topic2.
ItemReader
is reading data from DB2 and gave java object ClaimDto
. Now the ClaimProcessor
takes in the object of ClaimDto
and return CompositeClaimRecord
object which comprises of claimRecord1
and claimRecord2
which to be sent to two different Kafka topics. How to write claimRecord1
and claimRecord2
to topic1 and topic2 respectively.
推荐答案
只需编写一个自定义的 ItemWriter
即可.
Just write a custom ItemWriter
that does exactly that.
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
for (CompositeClaimRecord record : items) {
writer1.write(Collections.singletonList(record.claimRecord1));
writer2.write(Collections.singletonList(record.claimRecord2));
}
}
}
或者不是一次写入 1 个记录,而是将单个列表转换为 2 个列表并将其传递.但是这样的错误处理可能有点挑战.\
Or instead of writing 1 record at a time convert the single list into 2 lists and pass that along. But error handling might be a bit of a challenge that way. \
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());
writer1.write(record1List);
writer2.write(record2List);
}
}
这篇关于Spring Batch:一个 Reader,复合处理器(两个具有不同实体的类)和两个 kafkaItemWriter的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!