具有动态目标的 WriteToBigQuery [英] WriteToBigQuery with dynamic destinations

查看:21
本文介绍了具有动态目标的 WriteToBigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个 Apache Beam 管道,它从发布/订阅中读取一堆事件,然后根据事件类型将它们写入单独的 BigQuery 表中.

I am working on an Apache Beam pipeline that reads a bunch of events from a pub/sub and then based on event type, it writes them into separate BigQuery tables.

我知道 WriteToBigQuery 支持动态目的地,但就我而言,问题在于目的地是从从事件中读取的数据派生而来的.例如:一个事件看起来像

I know that WriteToBigQuery supports dynamic destinations but the problem in my case is that the destination is derived from the data that is read from the event. For example: an event looks like

{
 "object_id": 123,
 ... some metadata,
 "object_data": {object related info}
}

应写入 BigQuery 表的数据位于事件的 object_data 键下,但是,表名来自元数据中的其他字段.我尝试使用侧输入参数,但问题是因为每个事件可以有不同的目的地,侧输入不会相应地更新.代码如下:

Data that should be written to BigQuery table is under the object_data key from the event, but, the table name is derived from other fields in the metadata. I tried to use the side inputs params but the issue is that because each event can have different destinations, the side inputs don't update accordingly. The code is below:

class DumpToBigQuery(PTransform):

    def _choose_table(self, element, table_names):
        # table_names = {"table_name": "project_name.dataset.table_name}
        table_name = table_names["table_name"]
        return table_name

    def expand(self, pcoll):
        events = (
            pcoll
            | "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
            | "Window"
            >> WindowInto(
                windowfn=FixedWindows(self.window_interval_seconds)
            )
            | "GroupByKey" >> GroupByKey()
            | "KeepLastEventOnly" >> ParDo(WillTakeLatestEventForKey()
        )

        table_name = events | Map(lambda e: ["table_name", f"{self.project}:{self.dataset}.{e[0]}"])
        table_names_dct = AsDict(table_name)

        events_to_write = events | Map(lambda e: e[1]) | Map(self._drop_unwanted_fields)

        return events_to_write | "toBQ" >> WriteToBigQuery(
            table=self._choose_table,
            table_side_inputs=(table_names_dct,),
            create_disposition=BigQueryDisposition.CREATE_NEVER,
            insert_retry_strategy=RetryStrategy.RETRY_NEVER,
        )

您可以看到侧边输入取自管道 table_name 的另一个分支,该分支基本上是从事件中提取表名.然后,将其作为 WriteToBigQuery 的输入.不幸的是,这在负载下并没有真正起作用,侧面输入没有更新,并且一些事件使用了错误的目的地.

You can see that the side input is taken from the other branch of the pipeline table_name that is basically extracting the table name from the event. And then, this is given as input to WriteToBigQuery. Unfortunately, this doesn't really work as under load, the side input is not updated and some events are using wrong destinations.

在这种特定情况下我可以使用哪些其他方法?所有文档都使用静态示例,并没有真正涵盖这种动态方法.

What other approach I can use in this specific case? All the docs are using static examples and don't really cover this dynamic approach.

我尝试的另一件事是编写一个使用 HTTP BigQuery 客户端并插入行的自定义 DoFn,这里的问题是管道的速度,因为每个插入大约 6-7 个事件秒.

The other thing I tried is to write a custom DoFn that used the HTTP BigQuery client and inserts the rows, the issue here is the speed of the pipeline as is inserting around 6-7 events per seconds.

推荐答案

我遇到了类似的问题,我有一个解决方法.

I had a similar problem, for which I have a work around.

我看到您有 create_disposition=BigQueryDisposition.CREATE_NEVER,因此在代码运行之前就知道表列表.也许它很笨拙,但它是众所周知的.我有一个 DoFn,其中 yeild 有许多 TaggedOutputprocess 方法.然后我的管道看起来像:

I see that you have create_disposition=BigQueryDisposition.CREATE_NEVER so the list of tables is known before the code runs. Perhaps it is unweildy, but it is known. I have a DoFn which yeilds many TaggedOutputs its process method. Then my pipeline looks like:

parser_outputs = ['my', 'list', 'of', 'tables']
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
    pipe = (
        p
        | "Start" >> beam.Create(["example row"])
        | "Split"
        >> beam.ParDo(MySplitFn()).with_outputs(*parser_outputs)
    )

    for output in parser_outputs:
        pipe[output] | "write {}".format(output) >> beam.io.WriteToBigQuery(
            bigquery.TableReference(
                projectId=options.projectId, datasetId=DATASET_ID, tableId=output
            ),
            schema=padl_shared.getBQSchema(parser.getSchemaForDataflow(rowTypeName=output)),
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        )

    p.run().wait_until_finish() 

这篇关于具有动态目标的 WriteToBigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆