GCP Dataflow Apache Beam写入输出错误处理 [英] GCP Dataflow Apache Beam writing output error handling

查看:113
本文介绍了GCP Dataflow Apache Beam写入输出错误处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要对数据流应用错误处理,以便使用相同的主键对Spanner进行多次插入.逻辑是在当前消息之后可能会收到较旧的消息,因此我不想覆盖保存的值.因此,我将创建突变作为插入,并在尝试重复插入时抛出错误.

I need to apply error handling to my Dataflow for multiple inserts to Spanner with the same primary key. The logic being that an older message may be received after the current message and I do not want to overwrite the saved values. Therefore I will create my mutation as an insert and throw an error when a duplicate insert is attempted.

我已经看到了DoFn中try块的几个示例,这些示例写入侧面输出以记录任何错误.这是一个非常好的解决方案,但是我需要对写入不包含DoFn的Spanner的步骤应用错误处理.

I have seen several examples of try blocks within DoFn's that write to a side output to log any errors. This is a very nice solution but I need to apply error handling to the step that writes to Spanner which does not contain a DoFn

spannerBranchTuples2.get(spannerOutput2)
    .apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))                      
    .apply("Write Spanner Records", SpannerIO.write()
        .withInstanceId(options.getSpannerInstanceId())                  
        .withDatabaseId(options.getSpannerDatabaseId())
        .grouped());

我尚未找到任何允许将错误处理应用于此步骤的文档,或者找到了将其重新编写为DoFn的方法.有什么建议如何将错误处理应用于此吗?谢谢

I have not found any documentation that allows error handling to be applied to this step, or found a way to re-write it as a DoFn. Any suggestions how to apply error handling to this? thanks

推荐答案

有一个有趣的模式..

基本上,我们的想法是在将结果发送到写作转换之前先创建一个 DoFn .看起来像这样:

Basically, the idea is to have a DoFn before sending your results to your writing transforms. It'd look something like so:

final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
  @Override
  void processElement(ProcessContext c) {
  try {
    c.output(process(c.element());
  } catch (Exception e) {
    LOG.severe("Failed to process input {} -- adding to dead letter file",
      c.element(), e);
    c.sideOutput(deadLetterTag, c.element());
  }
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

outputTuple.get(deadLetterTag)
  .apply(/* Write to a file or table or anything */);

outputTuple.get(successTag)
  .apply(/* Write to Spanner or any other sink */);

让我知道这是否有用!

这篇关于GCP Dataflow Apache Beam写入输出错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆