PubsubIO , msg 超出最大大小,如何进行错误处理 [英] PubsubIO , msg exceeding max size, how to perform error handling

查看:22
本文介绍了PubsubIO , msg 超出最大大小,如何进行错误处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在 GCP Dataflow 中运行管道,并遇到了 pubsub 消息的最大消息大小 [1]发生这种情况时,管道滞后时间将开始累积,最终会停止……

We are running a pipeline in GCP Dataflow, and run into the max message size of a pubsub message [1] When this happens, the pipeline lag time will start to build up, eventually grinding to a halt...

此日志消息是在 GCP 堆栈驱动程序中的dataflow_step"下生成的,

This log message was produced in GCP stackdriver under 'dataflow_step',

我的问题,有没有办法在管道中定义错误处理...

My question, is there a way to define error handling in the pipeline...

.apply(PubsubIO.writeMessages()
                        .to("topic")
                        .withTimestampAttribute(Instant.now().toString()));

.onError(...perform error handling ...)

以类似于 Java8 流 api 的流畅方式.这将允许管道继续输出在 pubsub 限制内的输出.

In a similar fluent manner as the Java8 streams api. which would allow the pipeline to continue with outputs which are within the pubsub limits.

非常欢迎其他处理这种情况的解决方案.

Other solutions to deal with this situation are most welcome.

谢谢,克里斯托夫·布希尔

Thank You , Christophe Bouhier

[1] 由于验证错误,无法提交请求:generic::invalid_argument:Pubsub 发布请求限制为 10MB,拒绝超过 7MB 的消息以避免超过 byte64 请求编码的限制.

[1] Could not commit request due to validation error: generic::invalid_argument: Pubsub publish requests are limited to 10MB, rejecting message over 7MB to avoid exceeding limit with byte64 request encoding.

推荐答案

对于 Dataflow 上的 PubsubIO 的特殊情况,请注意,Dataflow 会覆盖 PubsubIO 并将读取和写入 Pubsub 的消息作为其流实现的一部分.我已经看到您正在讨论的相同错误出现在shuffler"下的日志中.而不是工人"由于这种替代.

For the particular case of PubsubIO on Dataflow, be aware that the Dataflow overrides PubsubIO and handles reading and writing messages to Pubsub as part of its streaming implementation. I've seen the same error you're discussing show up in logs under "shuffler" rather than "worker" due to this substitution.

我通过在 PubsubIO.write() 步骤之前实现自定义转换来解决这个相同的问题.这个 LimitPayloadSize 转换只是检查 PubsubMessage 中有多少字节,并且只允许有效载荷小于 7 MB 的消息通过.

I have worked around this same problem by implementing a custom transform before the PubsubIO.write() step. This LimitPayloadSize transform simply checks how many bytes are in the PubsubMessage and only allows through messages with payload less than 7 MB.

目前还没有用于转换中的错误处理的流畅 API,尽管这已经被讨论过了.目前,公认的模式是定义具有多个输出集合的转换,然后将失败消息的集合写入其他地方(例如通过 FileIO 的 GCS).您可以将其实现为一个裸 DoFn,或者您可以查看 Partition:

There is not currently a fluent API for error handling in transforms, although that's something that has been discussed. For now, the accepted pattern is to define a transform with multiple output collections and then write the collection of failing messages somewhere else (such as GCS via FileIO). You can implement this as a bare DoFn, or you could look at Partition:

PCollectionList<PubsubMessage> limitedPayloads = input
        .apply("Limit payload size",
                Partition
                        .of(2, new PartitionFn<PubsubMessage>() {
  public int partitionFor(PubsubMessage message, int numPartitions) {
    return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
  }
}));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);

这篇关于PubsubIO , msg 超出最大大小,如何进行错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆