Beam Streaming 管道不会将文件写入存储桶 [英] Beam streaming pipeline does not write files to bucket

查看:25
本文介绍了Beam Streaming 管道不会将文件写入存储桶的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

UI 在 GCP Dataflow 上有一个 Python 流管道,可以从 PubSub 读取数千条消息,如下所示:

UI have a python streaming pipeline on GCP Dataflow that reads thousands of messages from a PubSub, like this:

    with beam.Pipeline(options=pipeline_options) as p:
      lines = p | "read" >> ReadFromPubSub(topic=str(job_options.inputTopic))
      lines = lines | "decode" >> beam.Map(decode_message)
      lines = lines | "Parse" >> beam.Map(parse_json)
      lines = lines | beam.WindowInto(beam.window.FixedWindows(1*60))
      lines = lines | "Add device id key" >> beam.Map(lambda elem: (elem.get('id'), elem))
      lines = lines | "Group by key" >> beam.GroupByKey()
      lines = lines | "Abandon key" >> beam.Map(flatten)
      lines | "WriteToAvro" >> beam.io.WriteToAvro(job_options.outputLocation, schema=schema, file_name_suffix='.avro', mime_type='application/x-avro')

管道运行得很好,只是它从不产生任何输出.任何想法为什么?

The pipeline runs just fine, except it never produces any output. Any ideas why?

推荐答案

您的代码似乎存在一些问题.首先,有一些关于 null/None(你已经修复)和 ints/floats(在评论中指出)的格式错误的数据.最后,变换不能写入无界 PCollections.有一种解决方法,您可以在其中定义一个新的 sink 并将其与 WriteToFiles 转换,能够写入无界 PCollections.

It looks like there were a few problems with your code. First, there was some badly formatted data with regards to null/None (you fixed already) and ints/floats (called out in comments). Finally, the WriteToAvro transform cannot write unbounded PCollections. There is a work-around in which you define a new sink and use that with the WriteToFiles transform which is able to write unbounded PCollections.

请注意,截至撰写本文时 (2020-06-18),此方法不适用于 Apache Beam Python SDK <= 2.23.这是因为 Python pickler 无法反序列化腌制的 Avro 模式(请参阅 BEAM-6522).在这种情况下,这会强制解决方案改用 FastAvro.如果您手动升级 dill,则可以使用 Avro到 >= 0.3.1.1 Avro 到 >= 1.9.0,但要小心,因为目前尚未测试.

Note that as of the writing of this post (2020-06-18), this method does not work with the Apache Beam Python SDK <= 2.23. This is because the Python pickler cannot deserialize a pickled Avro schema (see BEAM-6522). In this case, this forces a solution to use FastAvro instead. You can use Avro if you manually upgrade dill to >= 0.3.1.1 and Avro to >= 1.9.0, but be careful as this is currently untested.

排除了警告,这里是解决方法:

With the caveats out of the way, here is the work-around:

from apache_beam.io.fileio import FileSink
from apache_beam.io.fileio import WriteToFiles
import fastavro

class AvroFileSink(FileSink):
    def __init__(self, schema, codec='deflate'):
        self._schema = schema
        self._codec = codec

    def open(self, fh):
        # This is called on every new bundle.
        self.writer = fastavro.write.Writer(fh, self._schema, self._codec)

    def write(self, record):
        # This is called on every element.
        self.writer.write(record)

    def flush(self):
        self.writer.flush()

这个新接收器的用法如下:

This new sink is used like the following:

import apache_beam as beam

# Replace the following with your schema.
schema = fastavro.schema.parse_schema({
    'name': 'row',
    'namespace': 'test',
    'type': 'record',
    'fields': [
        {'name': 'a', 'type': 'int'},
    ],
})

# Create the sink. This will be used by the WriteToFiles transform to write
# individual elements to the Avro file.
sink = AvroFileSink(schema=schema)

with beam.Pipeline(...) as p:
    lines = p | beam.ReadFromPubSub(...)
    lines = ...

    # This is where your new sink gets used. The WriteToFiles transform takes
    # the sink and uses it to write to a directory defined by the path 
    # argument.
    lines | WriteToFiles(path=job_options.outputLocation, sink=sink)

这篇关于Beam Streaming 管道不会将文件写入存储桶的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆