Spring Batch:一个 Reader,复合处理器(两个具有不同实体的类)和两个 kafkaItemWriter [英] Spring Batch : One Reader, composite processor (two classes with different entities) and two kafkaItemWriter

查看:49
本文介绍了Spring Batch:一个 Reader,复合处理器(两个具有不同实体的类)和两个 kafkaItemWriter的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

ItemReader 正在从 DB2 读取数据并提供 java 对象 ClaimDto.现在ClaimProcessor 接收ClaimDto 的对象并返回CompositeClaimRecord 对象,该对象由claimRecord1claimRecord2 组成 发送到两个不同的 Kafka 主题.如何将claimRecord1claimRecord2分别写入topic1和topic2.

ItemReader is reading data from DB2 and gave java object ClaimDto. Now the ClaimProcessor takes in the object of ClaimDto and return CompositeClaimRecord object which comprises of claimRecord1 and claimRecord2 which to be sent to two different Kafka topics. How to write claimRecord1 and claimRecord2 to topic1 and topic2 respectively.

推荐答案

只需编写一个自定义的 ItemWriter 即可.

Just write a custom ItemWriter that does exactly that.

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    for (CompositeClaimRecord record : items) {
       writer1.write(Collections.singletonList(record.claimRecord1));
       writer2.write(Collections.singletonList(record.claimRecord2));

    }
  }
}

或者不是一次写入 1 个记录,而是将单个列表转换为 2 个列表并将其传递.但是这样的错误处理可能有点挑战.\

Or instead of writing 1 record at a time convert the single list into 2 lists and pass that along. But error handling might be a bit of a challenge that way. \

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
    List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());

    writer1.write(record1List);
    writer2.write(record2List);


  }
}

这篇关于Spring Batch:一个 Reader,复合处理器(两个具有不同实体的类)和两个 kafkaItemWriter的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆