监控 WriteToBigQuery [英] Monitoring WriteToBigQuery

查看:29
本文介绍了监控 WriteToBigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的管道中,我使用 WriteToBigQuery 如下:

<代码>|beam.io.WriteToBigQuery('thijs:thijsset.thijstable',架构=表_架构,write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

这将返回文档中描述的 Dict,如下所示:

<块引用>

beam.io.WriteToBigQuery PTransform 返回一个字典BigQueryWriteFn.FAILED_ROWS 条目包含所有写入失败的行.

我如何打印这个 dict 并将其转换为 pcollection 或者我如何只打印 FAILED_ROWS?

如果我这样做:<代码>|打印">>beam.Map(打印)

然后我得到:AttributeError: 'dict' object has no attribute 'pipeline'

我肯定读过一百个管道,但在 WriteToBigQuery 之后我再也没有看到任何东西.

当我完成管道并将结果存储在一个变量中时,我有以下内容:

{'FailedRows': }

但我不知道如何在管道中使用这个结果:

<代码>|beam.io.WriteToBigQuery('thijs:thijsset.thijstable',架构=表_架构,write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)|['FailedRows'] 来自上一步|打印">>光束.地图(打印)

解决方案

处理无效输入的死信是一种常见的 Beam/Dataflow 用法,适用于 Java 和 Python SDK,但后者的示例并不多.

>

想象一下,我们有一些虚拟输入数据,其中有 10 行好行和一个不符合表架构的坏行:

schema = "index:INTEGER,event:STRING"data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]data.append('这是一个坏行')

然后,我要做的是命名写入结果(在本例中为events):

events = (p|创建数据">>光束.创建(数据)|CSV 到 dict">>梁.ParDo(CsvToDictFn())|写入 BigQuery">>beam.io.gcp.bigquery.WriteToBigQuery("{0}:dataflow_test.good_lines".format(PROJECT),模式=模式,))

然后访问

坏的一个到本地文件:

$ cat error_log.txt-00000-of-00001('PROJECT_ID:dataflow_test.good_lines', {'index': '这是一个坏行'})

如果您使用 DataflowRunner 运行它,您将需要一些额外的标志.如果您遇到 TypeError: 'PDone' object has no attribute '__getitem__' 错误,您需要添加 --experiments=use_beam_bq_sink 以使用新的 BigQuery 接收器.

如果您收到 KeyError: 'FailedRows' 那是因为新接收器将

DirectRunnerDataflowRunner 的完整代码这里.

In my pipeline I use WriteToBigQuery something like this:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

This returns a Dict as described in the documentation as follows:

The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.

How do I print this dict and turn it into a pcollection or how do I just print the FAILED_ROWS?

If I do: | "print" >> beam.Map(print)

Then I get: AttributeError: 'dict' object has no attribute 'pipeline'

I must have read a hundred pipelines but never have I seen anything after the WriteToBigQuery.

[edit] When I finish the pipeline and store the results in a variable I have the following:

{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}

But I do not know how to use this result in the pipeline like this:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)

解决方案

Dead letters to handle invalid inputs are a common Beam/Dataflow usage and work with both Java and Python SDKs but there are not many examples for the latter.

Imagine that we have some dummy input data with 10 good lines and a bad row that does not conform to the table schema:

schema = "index:INTEGER,event:STRING"

data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')

Then, what I do is name the write result (events in this case):

events = (p
    | "Create data" >> beam.Create(data)
    | "CSV to dict" >> beam.ParDo(CsvToDictFn())
    | "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        "{0}:dataflow_test.good_lines".format(PROJECT),
        schema=schema,
    )
 )

and then access the FAILED_ROWS side output:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
    | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))

This works well with the DirectRunner and writes the good lines to BigQuery:

and the bad one to a local file:

$ cat error_log.txt-00000-of-00001 
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})

If you run it with the DataflowRunner you'll need some additional flags. If you face the TypeError: 'PDone' object has no attribute '__getitem__' error you'll need to add --experiments=use_beam_bq_sink to use the new BigQuery sink.

If you get a KeyError: 'FailedRows' it's because the new sink will default to load BigQuery jobs for batch pipelines:

STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.

You can override the behavior by specifying method='STREAMING_INSERTS' in WriteToBigQuery:

Full code for both DirectRunner and DataflowRunner here.

这篇关于监控 WriteToBigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆