当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub [英] NACK not send back to Google Cloud Pub/Sub from Dataflow when there is an error in ParDo function

查看:22
本文介绍了当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当 Dataflow 作业无法或​​不愿处理消息时,如何向 Pub/Sub 发送 NACK.

How do I send a NACK to Pub/Sub when Dataflow job is unable or unwilling to deal with the message.

Pipeline pipeline = Pipeline.create(options);

    pipeline.apply("gcs2ZipExtractor-processor",
            PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(pubSubSubscription))
           .apply(ParDo.of(new ProcessZipFileEventDoFn(appProps)));
    logger.info("Started ZipFile Extractor");
    pipeline.run().waitUntilFinish();

以上是我用来运行 ApacheBeam Dataflow 管道作业的代码片段.如果 ProcessZipFileEventDoFn 发生任何故障,我想向 Pub/Sub 订阅发送 NACK 消息,以便将消息移动到 DeadletterTopic.目前,Dataflow Runner 未发生 NACK.

Above is the code snippet I am using to run the ApacheBeam Dataflow pipeline job. If there is any failure happen in the ProcessZipFileEventDoFn, I want to send a NACK message to Pub/Sub subscription so that the message would be moved to DeadletterTopic. At present NACK is not happening from Dataflow Runner.

推荐答案

目前,Apache Beam SDK 不支持 Pub/Sub 的原生死信队列功能.但是,您可以很容易地编写自己的代码.以下内容来自这个 博客文章 适合您的代码.诀窍是使用来自单个 ParDo 的多个输出.一个输出 PCollection 将具有好"不抛出任何异常的数据.另一个输出 PCollection 将包含所有坏"数据.数据,如果有任何异常.然后,您可以将死信 PCollection 中的所有元素写入接收器,在您的情况下为 Pub/Sub 主题.

At this current time, the Apache Beam SDK has no support for Pub/Sub's native dead-letter queue feature. However, you can write your own fairly easily. The following is from this blog post adapted to your code. The trick is to use multiple outputs from a single ParDo. One output PCollection will have the "good" data that does not throw any exceptions. The other output PCollection will contain all the "bad" data if there are any exceptions. You can then write all the elements in the dead letter PCollection to a sink, in your case a Pub/Sub topic.

PCollection input =
    pipeline.apply("gcs2ZipExtractor-processor",
                   PubsubIO.readMessagesWithAttributes()
                       .fromSubscription(pubSubSubscription))

// Put this try-catch logic in your ProcessZipFileEventDoFn, and don't forget
// the "withOutputTags"!
final TupleTag successTag ;
final TupleTag deadLetterTag;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn() {  
  @Override  
  void processElement(ProcessContext c) {    
  try {      
    c.output(process(c.element());    
  } catch (Exception e) {      
    c.sideOutput(deadLetterTag, c.element());  
  }
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead letter inputs to Pub/Sub for later analysis
outputTuple.get(deadLetterTag).apply(PubSubIO.write(...));

// Retrieve the successful elements...
PCollection success = outputTuple.get(successTag);

这篇关于当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆