监视WriteToBigQuery [英] Monitoring WriteToBigQuery
问题描述
在我的管道中,我使用WriteToBigQuery这样的东西:
In my pipeline I use WriteToBigQuery something like this:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
这将返回一个Dict,如文档中所述,如下:
This returns a Dict as described in the documentation as follows:
beam.io.WriteToBigQuery PTransform返回一个字典,该字典的 BigQueryWriteFn.FAILED_ROWS条目包含所有 无法写入的行.
The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.
我如何打印此字典并将其转变为一个集合,或者如何仅打印FAILED_ROWS?
How do I print this dict and turn it into a pcollection or how do I just print the FAILED_ROWS?
如果我这样做:| "print" >> beam.Map(print)
然后我得到:AttributeError: 'dict' object has no attribute 'pipeline'
我必须阅读一百条管道,但在WriteToBigQuery之后再也看不到任何东西.
I must have read a hundred pipelines but never have I seen anything after the WriteToBigQuery.
当我完成管道并将结果存储在变量中时,我将具有以下内容:
[edit] When I finish the pipeline and store the results in a variable I have the following:
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}
但是我不知道如何在这样的管道中使用此结果:
But I do not know how to use this result in the pipeline like this:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
推荐答案
处理无效输入的死字母是常见的Beam/Dataflow用法,可与Java和Python SDK一起使用,但后者的示例并不多.
Dead letters to handle invalid inputs are a common Beam/Dataflow usage and work with both Java and Python SDKs but there are not many examples for the latter.
想象一下,我们有一些虚拟输入数据,其中有10条好行和一条不符合表模式的坏行:
Imagine that we have some dummy input data with 10 good lines and a bad row that does not conform to the table schema:
schema = "index:INTEGER,event:STRING"
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
然后,我要做的就是命名写入结果(在这种情况下为events
):
Then, what I do is name the write result (events
in this case):
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
,然后访问 FAILED_ROWS
侧面输出:
and then access the FAILED_ROWS
side output:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
这与DirectRunner
一起很好地工作,并将好的行写到BigQuery:
This works well with the DirectRunner
and writes the good lines to BigQuery:
和坏文件到本地文件:
$ cat error_log.txt-00000-of-00001
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})
如果使用DataflowRunner
运行它,则需要一些其他标志.如果遇到TypeError: 'PDone' object has no attribute '__getitem__'
错误,则需要添加--experiments=use_beam_bq_sink
才能使用新的BigQuery接收器.
If you run it with the DataflowRunner
you'll need some additional flags. If you face the TypeError: 'PDone' object has no attribute '__getitem__'
error you'll need to add --experiments=use_beam_bq_sink
to use the new BigQuery sink.
如果得到KeyError: 'FailedRows'
,则是因为新接收器将
If you get a KeyError: 'FailedRows'
it's because the new sink will default to load BigQuery jobs for batch pipelines:
STREAMING_INSERTS,FILE_LOADS或DEFAULT.加载简介 数据传输到BigQuery: https://cloud.google.com/bigquery/docs/loading-data . DEFAULT将在流式传输管道上使用STREAMING_INSERTS,并且 批处理管道上的FILE_LOADS.
STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.
您可以通过在WriteToBigQuery
中指定method='STREAMING_INSERTS'
来覆盖行为:
You can override the behavior by specifying method='STREAMING_INSERTS'
in WriteToBigQuery
:
DirectRunner
和DataflowRunner
的完整代码此处.
这篇关于监视WriteToBigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!