Flink KeyedCoProcessFunction中的NPE [英] NPE in Flink KeyedCoProcessFunction
本文介绍了Flink KeyedCoProcessFunction中的NPE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我在连接的流上使用KeyedCoProcessFunction,两个流都由id
设置键,而且我使用MapState并在键不存在的情况下放置一个List类型的值,并且我也在processElement2
中检查键的存在,所以理想情况下没有NPE的机会,但仍然可以得到它。
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
private MapState<Long, List<Row>> clickstreamState = null;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Row> MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
map = getRuntimeContext().getMapState(MapStateDescriptor);
MapStateDescriptor<Long, List<Row>> clickstreamStateMapStateDescriptor =
new MapStateDescriptor<Long, List<Row>>(
"clickstreamState",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<List<Row>>() {}));
clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(5).build());
clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
Long id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
// join immediately any matching click events, waiting for counterpart
if (clickstreamState.contains(id)) {
for (Row curRow : clickstreamState.get(id)) {
// enrich join
Row joinRow = join(clickstreamState.get(id), lookupRow);
out.collect(joinRow);
}
clickstreamState.remove(id);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
Long id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
Row joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
if (clickstreamState.contains(id)) {
List<Row> rows = clickstreamState.get(id);
if (rows != null) {
rows.add(clickRow);
} else {
throw new NullPointerException("This exception should never throw NPE");
}
} else {
val clickList = new ArrayList<Row>();
clickList.add(clickRow);
clickstreamState.put(id, clickList);
}
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}
推荐答案
从根本上说,我认为实现有缺陷。当您的processElement1
和processElement2
方法被调用时,运算符的状态已被限定为键值。因此不需要MapState<id, Row>
或MapState<id, List<Row>>
状态,您只需要ValueState<Row>
和ListState<Row>
。
这篇关于Flink KeyedCoProcessFunction中的NPE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文