Flink KeyedCoProcessFunction中的NPE [英] NPE in Flink KeyedCoProcessFunction

查看:0
本文介绍了Flink KeyedCoProcessFunction中的NPE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在连接的流上使用KeyedCoProcessFunction,两个流都由id设置键,而且我使用MapState并在键不存在的情况下放置一个List类型的值,并且我也在processElement2中检查键的存在,所以理想情况下没有NPE的机会,但仍然可以得到它。

val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
        .connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
        .process(new EnrichJoinFunction());

public static class EnrichJoinFunction
  extends KeyedCoProcessFunction<Long, Row, Row, Row> {


final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

private MapState<Long, Row> map = null;
private MapState<Long, List<Row>> clickstreamState = null;

@Override
public void open(Configuration parameters) throws Exception {
  MapStateDescriptor<Long, Row> MapStateDescriptor =
      new MapStateDescriptor<Long, Row>(
          "state",
          TypeInformation.of(Long.class),
          TypeInformation.of(new TypeHint<Row>() {}));
  MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
  map = getRuntimeContext().getMapState(MapStateDescriptor);

  MapStateDescriptor<Long, List<Row>> clickstreamStateMapStateDescriptor =
      new MapStateDescriptor<Long, List<Row>>(
          "clickstreamState",
          TypeInformation.of(Long.class),
          TypeInformation.of(new TypeHint<List<Row>>() {}));
  clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(5).build());
  clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}

@Override
public void processElement1(
    Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
    throws Exception {
  log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
  Long id = lookupRow.<Long>getFieldAs("id");
  if (!map.contains(id)) {
    map.put(id, lookupRow);
  }

  // join immediately any matching click events, waiting for counterpart
  if (clickstreamState.contains(id)) {
    for (Row curRow : clickstreamState.get(id)) {
      // enrich join
      Row joinRow = join(clickstreamState.get(id), lookupRow);
      out.collect(joinRow);
    }
    clickstreamState.remove(id);
  } 
}

@Override
public void processElement2(
    Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
    throws Exception {
  log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

  Long id = clickRow.<Long>getFieldAs("id");

  if (map.contains(id)) {
      // enrich join
      Row joinRow = join(clickRow, map.get(id));
      out.collect(joinRow);
  } else {
    if (clickstreamState.contains(id)) {
      List<Row> rows = clickstreamState.get(id);
      if (rows != null) {
        rows.add(clickRow);
      } else {
        throw new NullPointerException("This exception should never throw NPE");
      }
    } else {
      val clickList = new ArrayList<Row>();
      clickList.add(clickRow);
      clickstreamState.put(id, clickList);
    }
  }
}

public Row join(Row clickRow, Row lookupRow) throws ParseException {
  Row joinedRow = new Row(RowKind.INSERT, 13);
  // row setter join ouput
  return joinedRow;
}
}

推荐答案

从根本上说,我认为实现有缺陷。当您的processElement1processElement2方法被调用时,运算符的状态已被限定为键值。因此不需要MapState<id, Row>MapState<id, List<Row>>状态,您只需要ValueState<Row>ListState<Row>

这篇关于Flink KeyedCoProcessFunction中的NPE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆