流式缓冲区 - Google BigQuery [英] Streaming buffer - Google BigQuery

查看:37
本文介绍了流式缓冲区 - Google BigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个像 Google Dataflow 模板一样使用的 Python 程序.

I'm developing a python program to use like Google Dataflow template.

我正在做的是从 PubSub 在 BigQuery 中写入数据:

What I'm doing is writing the data in BigQuery from PubSub:

 pipeline_options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=pipeline_options)

    (p
     # This is the source of the pipeline.
     | 'Read from PubSub' >> beam.io.ReadFromPubSub('projects/.../topics/...')
     #<Transformation code if needed>
     # Destination
     | 'String To BigQuery Row' >> beam.Map(lambda s: dict(Trama=s))
     | 'Write to BigQuery' >> beam.io.Write(
                beam.io.BigQuerySink(
                    known_args.output,
                    schema='Trama:STRING',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                ))
     )
    p.run().wait_until_finish()

代码在本地运行,尚未在 Google Dataflow 中运行

The code is running in local, not in Google Dataflow yet

这有效"但不是我想要的方式,因为当前数据存储在 BigQuery Buffer Stream 中,我看不到它(即使等待一段时间后).

This "works" but not the way i want, because currently the data are stored in the BigQuery Buffer Stream and I can not see it (even after waiting some time).

BigQuery 何时可用?为什么存储在缓冲流中而不是普通"表中?

When are gonna be available in BigQuery? Why are stored in the buffer stream instead of the "normal" table?

推荐答案

这就是问题所在:

 beam.io.Write(beam.io.BigQuerySink

应该是:

 beam.io.WriteToBigQuery

第一个在我从文件中读取时运行良好,第二个在我从 pub/sub 中读取时运行良好

The first work well while I was reading from a file, the second while I read from pub/sub

这篇关于流式缓冲区 - Google BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆