Spring Batch:一个阅读器,一个复合处理器(两个具有不同实体的类)和两个kafkaItemWriter [英] Spring Batch : One Reader, composite processor (two classes with different entities) and two kafkaItemWriter

查看:192
本文介绍了Spring Batch:一个阅读器,一个复合处理器(两个具有不同实体的类)和两个kafkaItemWriter的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

ItemReader正在从DB2读取数据,并提供了Java对象ClaimDto.现在,ClaimProcessor接受ClaimDto的对象,并返回CompositeClaimRecord对象,该对象由claimRecord1claimRecord2组成,该对象将被发送到两个不同的Kafka主题.如何分别将claimRecord1claimRecord2写入topic1和topic2.

ItemReader is reading data from DB2 and gave java object ClaimDto. Now the ClaimProcessor takes in the object of ClaimDto and return CompositeClaimRecord object which comprises of claimRecord1 and claimRecord2 which to be sent to two different Kafka topics. How to write claimRecord1 and claimRecord2 to topic1 and topic2 respectively.

推荐答案

只需编写一个自定义的ItemWriter即可.

Just write a custom ItemWriter that does exactly that.

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    for (CompositeClaimRecord record : items) {
       writer1.write(Collections.singletonList(record.claimRecord1));
       writer2.write(Collections.singletonList(record.claimRecord2));

    }
  }
}

或者不是一次写入1条记录,而是将单个列表转换为2个列表并将其传递.但是以这种方式,错误处理可能会有点挑战. \

Or instead of writing 1 record at a time convert the single list into 2 lists and pass that along. But error handling might be a bit of a challenge that way. \

public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {

  private final ItemWriter<Record1> writer1;
  private final ItemWriter<Record2> writer2;

  public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
    this.writer1=writer1;
    this.writer2=writer2;
}

  public void write(List<CompositeClaimRecord> items) throws Exception {

    List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
    List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());

    writer1.write(record1List);
    writer2.write(record2List);


  }
}

这篇关于Spring Batch:一个阅读器,一个复合处理器(两个具有不同实体的类)和两个kafkaItemWriter的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆