从发布/订阅流式传输到BigQuery [英] Streaming from Pub/Sub to BigQuery

本文介绍了从发布/订阅流式传输到BigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用python数据流将一些数据从Google PubSub流式传输到BigQuery. 为了进行测试,我修改了以下代码 https:通过设置

I am trying to stream some data from google PubSub into BigQuery using a python dataflow. For testing purposes I have adapted the following code https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py into a streaming pipeline by setting

options.view_as(StandardOptions).streaming = True

因此,我然后更改了record_ids管道以从Pub/Sub读取

So then I changed the record_ids pipeline to read from Pub/Sub

# ADDED THIS
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15))
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode))
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
records | 'Write' >> beam.io.Write(
    beam.io.BigQuerySink(
        OUTPUT,
        schema=table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

注意:我已被Google列入白名单以运行代码(以Alpha版本显示)

Note: I have been whitelisted by google to run the code (in alpha)

现在尝试时出现错误

工作流程失败.原因:(f215df7c8fcdbb00):未知流接收器:bigquery

Workflow failed. Causes: (f215df7c8fcdbb00): Unknown streaming sink: bigquery

您可以在此处找到完整的代码: https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

You can find the full code here: https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

我认为这与流式类型的管道有关,有人可以告诉我如何在流式管道中编写bigQuery吗?

I think that this has to do with the pipeline being now of type streaming, can anyone please tell me how to do a bigQuery write in a streaming pipeline?

推荐答案

Beam Python不支持从流管道写入BigQuery.现在,您将需要使用Beam Java-您可以分别使用PubsubIO.readStrings()BigQueryIO.writeTableRows().

Beam Python does not support writing to BigQuery from streaming pipelines. For now, you will need to use Beam Java - you can use respectively PubsubIO.readStrings() and BigQueryIO.writeTableRows().

这篇关于从发布/订阅流式传输到BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆