监视WriteToBigQuery [英] Monitoring WriteToBigQuery

查看:66
本文介绍了监视WriteToBigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的管道中,我使用WriteToBigQuery这样的东西:

In my pipeline I use WriteToBigQuery something like this:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

这将返回一个Dict,如文档中所述,如下:

This returns a Dict as described in the documentation as follows:

beam.io.WriteToBigQuery PTransform返回一个字典,该字典的 BigQueryWriteFn.FAILED_ROWS条目包含所有 无法写入的行.

The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.

我如何打印此字典并将其转变为一个集合,或者如何仅打印FAILED_ROWS?

How do I print this dict and turn it into a pcollection or how do I just print the FAILED_ROWS?

如果我这样做:| "print" >> beam.Map(print)

然后我得到:AttributeError: 'dict' object has no attribute 'pipeline'

我必须阅读一百条管道,但在WriteToBigQuery之后再也看不到任何东西.

I must have read a hundred pipelines but never have I seen anything after the WriteToBigQuery.

当我完成管道并将结果存储在变量中时,我将具有以下内容:

[edit] When I finish the pipeline and store the results in a variable I have the following:

{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}

但是我不知道如何在这样的管道中使用此结果:

But I do not know how to use this result in the pipeline like this:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)

推荐答案

处理无效输入的死字母是常见的Beam/Dataflow用法,可与Java和Python SDK一起使用,但后者的示例并不多.

Dead letters to handle invalid inputs are a common Beam/Dataflow usage and work with both Java and Python SDKs but there are not many examples for the latter.

想象一下,我们有一些虚拟输入数据,其中有10条好行和一条不符合表模式的坏行:

Imagine that we have some dummy input data with 10 good lines and a bad row that does not conform to the table schema:

schema = "index:INTEGER,event:STRING"

data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')

然后,我要做的就是命名写入结果(在这种情况下为events):

Then, what I do is name the write result (events in this case):

events = (p
    | "Create data" >> beam.Create(data)
    | "CSV to dict" >> beam.ParDo(CsvToDictFn())
    | "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        "{0}:dataflow_test.good_lines".format(PROJECT),
        schema=schema,
    )
 )

,然后访问 FAILED_ROWS 侧面输出:

and then access the FAILED_ROWS side output:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
    | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))

这与DirectRunner一起很好地工作,并将好的行写到BigQuery:

This works well with the DirectRunner and writes the good lines to BigQuery:

和坏文件到本地文件:

$ cat error_log.txt-00000-of-00001 
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})

如果使用DataflowRunner运行它,则需要一些其他标志.如果遇到TypeError: 'PDone' object has no attribute '__getitem__'错误,则需要添加--experiments=use_beam_bq_sink才能使用新的BigQuery接收器.

If you run it with the DataflowRunner you'll need some additional flags. If you face the TypeError: 'PDone' object has no attribute '__getitem__' error you'll need to add --experiments=use_beam_bq_sink to use the new BigQuery sink.

如果得到KeyError: 'FailedRows',则是因为新接收器将

If you get a KeyError: 'FailedRows' it's because the new sink will default to load BigQuery jobs for batch pipelines:

STREAMING_INSERTS,FILE_LOADS或DEFAULT.加载简介 数据传输到BigQuery: https://cloud.google.com/bigquery/docs/loading-data . DEFAULT将在流式传输管道上使用STREAMING_INSERTS,并且 批处理管道上的FILE_LOADS.

STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.

您可以通过在WriteToBigQuery中指定method='STREAMING_INSERTS'来覆盖行为:

You can override the behavior by specifying method='STREAMING_INSERTS' in WriteToBigQuery:

DirectRunnerDataflowRunner的完整代码此处.

这篇关于监视WriteToBigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆