PubsubIO,味精超过最大大小,如何执行错误处理 [英] PubsubIO , msg exceeding max size, how to perform error handling

查看:101
本文介绍了PubsubIO,味精超过最大大小,如何执行错误处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在GCP Dataflow中运行管道,并遇到pubsub消息的最大消息大小[1] 发生这种情况时,管道的滞后时间将开始累积,最终停滞不前...

We are running a pipeline in GCP Dataflow, and run into the max message size of a pubsub message [1] When this happens, the pipeline lag time will start to build up, eventually grinding to a halt...

此日志消息是在GCP堆栈驱动程序中的"dataflow_step"下生成的,

This log message was produced in GCP stackdriver under 'dataflow_step',

我的问题是,有没有一种方法可以在管道中定义错误处理...

My question, is there a way to define error handling in the pipeline...

.apply(PubsubIO.writeMessages()
                        .to("topic")
                        .withTimestampAttribute(Instant.now().toString()));

类似

.onError(...perform error handling ...)

以与Java8流api类似的流利方式.这将允许管道继续在pubsub限制内的输出.

In a similar fluent manner as the Java8 streams api. which would allow the pipeline to continue with outputs which are within the pubsub limits.

最欢迎使用其他解决方案来解决这种情况.

Other solutions to deal with this situation are most welcome.

谢谢, 克里斯托夫·布希尔(Christophe Bouhier)

Thank You , Christophe Bouhier

[1]由于验证错误而无法提交请求:generic :: invalid_argument:Pubsub发布请求限制为10MB,拒绝7MB以上的邮件,以免超出byte64请求编码的限制.

[1] Could not commit request due to validation error: generic::invalid_argument: Pubsub publish requests are limited to 10MB, rejecting message over 7MB to avoid exceeding limit with byte64 request encoding.

推荐答案

对于Dataflow上PubpubIO的特殊情况,请注意,Dataflow会覆盖PubsubIO,并作为其流实现的一部分来处理对Pubsub的读写消息.我看到了您正在讨论的相同错误,该错误显示在日志中的"shuffler"下而不是工人"由于这种替换.

For the particular case of PubsubIO on Dataflow, be aware that the Dataflow overrides PubsubIO and handles reading and writing messages to Pubsub as part of its streaming implementation. I've seen the same error you're discussing show up in logs under "shuffler" rather than "worker" due to this substitution.

我通过在PubsubIO.write()步骤之前实现自定义转换来解决了相同的问题. LimitPayloadSize转换仅检查PubsubMessage中有多少字节,并且仅允许有效载荷小于7 MB的消息通过.

I have worked around this same problem by implementing a custom transform before the PubsubIO.write() step. This LimitPayloadSize transform simply checks how many bytes are in the PubsubMessage and only allows through messages with payload less than 7 MB.

当前还没有流利的API来处理转换中的错误,尽管已对此进行了讨论.目前,可接受的模式是定义一个具有多个输出集合的转换,然后将失败消息的集合写入其他位置(例如,通过FileIO的GCS).您可以将其实现为裸DoFn,也可以查看分区:

There is not currently a fluent API for error handling in transforms, although that's something that has been discussed. For now, the accepted pattern is to define a transform with multiple output collections and then write the collection of failing messages somewhere else (such as GCS via FileIO). You can implement this as a bare DoFn, or you could look at Partition:

PCollectionList<PubsubMessage> limitedPayloads = input
        .apply("Limit payload size",
                Partition
                        .of(2, new PartitionFn<PubsubMessage>() {
  public int partitionFor(PubsubMessage message, int numPartitions) {
    return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
  }
}));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);

这篇关于PubsubIO,味精超过最大大小,如何执行错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆