带有 Pub/Sub 源的 Apache Beam Python SDK 卡在运行时 [英] Apache Beam Python SDK with Pub/Sub source stuck at runtime

查看:22
本文介绍了带有 Pub/Sub 源的 Apache Beam Python SDK 卡在运行时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Python SDK 在 Apache Beam 中编写程序以从 Pub/Sub 读取 JSON 文件的内容,并对接收到的字符串进行一些处理.这是程序中我从 Pub/Sub 中提取内容并进行处理的部分:

以beam.Pipeline(options=PipelineOptions()) 作为管道:线 = 管道 |beam.io.gcp.pubsub.ReadStringsFromPubSub(订阅=known_args.subscription)lines_decoded = 行数 |beam.Map(lambda x: x.decode("base64"))lines_split = lines_decoded |(beam.FlatMap(lambda x: x.split('\n')))def json_to_tuple(jsonStr):res = json.loads(jsonStr)##printing retutn 值打印 (res['id'], res['messageSize'])##返回 (res['id'], res['messageSize'])元组 = lines_split |梁.地图(json_to_tuple)定义打印线(行):印刷线结果 = 元组 |beam.CombinePerKey(sum)结果|梁.地图(印刷线)

在运行程序时,代码在创建 PCollection tupled 后卡住(此后没有执行任何代码行).奇怪的是,当我将源从 Pub/Sub 更改为包含 完全相同 内容的本地文件(使用 ReadFromText())时,程序运行良好.这种行为的原因是什么?

解决方案

根据 Pub/Sub I/O 文档(Apache Beam 文档Dataflow Pub/Sub I/O 文档),默认情况下,PubsubIO 转换使用无界 PCollections.>

PCollections 可以是有界或无界:

  • 有界:数据来自固定来源,如文件.
  • 无界:数据来自不断更新的来源,例如 Pub/Sub 订阅.

在对无界 PCollection 进行操作之前,您必须使用以下策略之一:

这可以解释您所看到的行为,即从本地文件(这是一个有界数据源)读取时相同的管道工作,但从 Pub/Sub 读取时不工作订阅(这是一个无界数据源).

因此,为了使用 Pub/Sub 订阅,您应该应用窗口或触发策略,以便 PCollections 中的数据可以在以下转换中正确处理.

此外,正如@Arjun 发现的那样,可能需要通过使用以下选项设置适当的 arg 参数来启用管道中的流式处理命令:

pipeline_options.view_as(StandardOptions).streaming = True

I am writing a program in Apache Beam using Python SDK to read from Pub/Sub the contents of a JSON file, and do some processing on the received string. This is the part in the program where I pull contents from Pub/Sub and do the processing:

with beam.Pipeline(options=PipelineOptions()) as pipeline:
    lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
    lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))

    lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))

    def json_to_tuple(jsonStr):
        res = json.loads(jsonStr)
        ##printing retutn value
        print (res['id'], res['messageSize'])
        ##
        return (res['id'], res['messageSize'])

    tupled = lines_split | beam.Map(json_to_tuple)

    def printlines(line):
        print line

    result = tupled | beam.CombinePerKey(sum)
    result | beam.Map(printlines)

While running the program, the code gets stuck after the creation of PCollection tupled (no lines of code is executing after that). The strange thing is that when I change the source from Pub/Sub to a local file which contain the exact same content (using ReadFromText()), the program is working perfectly. What could be the reason for this behaviour?

解决方案

According to the Pub/Sub I/O documentation (both the Apache Beam docs and Dataflow Pub/Sub I/O docs), by default, PubsubIO transforms work with unbounded PCollections.

PCollections can be either bounded or unbounded:

  • Bounded: the data comes from a fixed source, like a file.
  • Unbounded: the data comes from a source that is continuously updating, such as a Pub/Sub subscription.

Before you operate over an unbounded PCollection, you must use one of the following strategies:

  • Windowing: unbounded PCollections cannot be directly used on a grouping transform (such as the CombinePerKey you are using), so you should first set a non-global windowing function.
  • Triggers: you can set up a trigger for an unbounded PCollection in such a way that it provides periodic updates on an unbounded dataset, even if the data in the subscription is still flowing.

This may explain the behavior you are seeing, i.e. the same pipeline working when it reads from a local file (which is a bounded data source) but not working when it reads from a Pub/Sub subscription (which is an unbounded data source).

Therefore, in order to work with a Pub/Sub subscription, you should apply a windowing or triggering strategy so that the data in the PCollections can be properly processed in the following transforms.

EDIT: Also, as found out by @Arjun, it may be required to enable Streaming in the Pipeline with the option by setting the appropriate arg parameter using the following command:

pipeline_options.view_as(StandardOptions).streaming = True

这篇关于带有 Pub/Sub 源的 Apache Beam Python SDK 卡在运行时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆