GCP Dataflow Apache Beam写入输出错误处理 [英] GCP Dataflow Apache Beam writing output error handling
问题描述
我需要对数据流应用错误处理,以便使用相同的主键对Spanner进行多次插入.逻辑是在当前消息之后可能会收到较旧的消息,因此我不想覆盖保存的值.因此,我将创建突变作为插入,并在尝试重复插入时抛出错误.
I need to apply error handling to my Dataflow for multiple inserts to Spanner with the same primary key. The logic being that an older message may be received after the current message and I do not want to overwrite the saved values. Therefore I will create my mutation as an insert and throw an error when a duplicate insert is attempted.
我已经看到了DoFn中try块的几个示例,这些示例写入侧面输出以记录任何错误.这是一个非常好的解决方案,但是我需要对写入不包含DoFn的Spanner的步骤应用错误处理.
I have seen several examples of try blocks within DoFn's that write to a side output to log any errors. This is a very nice solution but I need to apply error handling to the step that writes to Spanner which does not contain a DoFn
spannerBranchTuples2.get(spannerOutput2)
.apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))
.apply("Write Spanner Records", SpannerIO.write()
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.grouped());
我尚未找到任何允许将错误处理应用于此步骤的文档,或者找到了将其重新编写为DoFn的方法.有什么建议如何将错误处理应用于此吗?谢谢
I have not found any documentation that allows error handling to be applied to this step, or found a way to re-write it as a DoFn. Any suggestions how to apply error handling to this? thanks
推荐答案
有一个有趣的模式..
基本上,我们的想法是在将结果发送到写作转换之前先创建一个 DoFn
.看起来像这样:
Basically, the idea is to have a DoFn
before sending your results to your writing transforms. It'd look something like so:
final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element());
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
outputTuple.get(deadLetterTag)
.apply(/* Write to a file or table or anything */);
outputTuple.get(successTag)
.apply(/* Write to Spanner or any other sink */);
让我知道这是否有用!
这篇关于GCP Dataflow Apache Beam写入输出错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!