在Python中使用BigQuery接收器流式传输管道 [英] Streaming pipelines with BigQuery sinks in python

查看:105
本文介绍了在Python中使用BigQuery接收器流式传输管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在建立一个Apache射流管道,其来源是Pubsub,接收器是BigQuery。我收到了错误消息:

I'm building an apache beam streaming pipeline whose source is Pubsub and sink is BigQuery. I've gotten the error messsage:

工作流程失败。原因:消息代码未知。

"Workflow failed. Causes: Unknown message code."

我现在认为这是神秘的消息,因为不支持将BigQuery作为流传输管道的接收器,它在这里说:
从Pub / Sub到BigQuery的流

As cryptic as this message is I now believe it to be the case that BigQuery is not supported as a sink for streaming pipelines, it says this here: Streaming from Pub/Sub to BigQuery

我肯定会纠正这是导致问题?还是在任何情况下仍不支持?

Am I certainly correct that this is what's causing the problem? Or if not is it still not supported in any case?

有人可以暗示何时发布此功能吗?真可惜,我很高兴能使用它。

Can anyone hint at when this feature will be released? It's a shame, I was pretty excited to get using this.

推荐答案

Python Streaming管道从Beam 2.5.0开始作为实验可用。记录在Beam docs 此处

Python Streaming pipelines are experimentally available since Beam 2.5.0 as documented in beam docs here

因此,您需要安装apache-beam 2.5.0和apache-beam [gcp]

Therefore you will need to install apache-beam 2.5.0 and apache-beam[gcp]

pip install apache-beam==2.5.0
pip install apache-beam[gcp]

I运行以下命令:

python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming

使用下面的代码,它可以正常工作:

Using the code below, and it works alright:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', dest='input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:

    # Read from PubSub
    lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
    #Adapt messages from PubSub to BQ table
    lines = lines | beam.Map(parse_pubsub)
    lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
    #Write to a BQ table 
    lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

此代码使用公开可用的主题 --topic
projects / pubsub-public-data / topics / taxirides-realtime和BQ用正确的模式创建的表。

This code uses the publicly available topic "--topic projects/pubsub-public-data/topics/taxirides-realtime" and BQ table that I have created with the right schema.

如果使用此示例,请注意不要使其运行,否则会产生成本,因为您会收到很多消息来自此PubSub主题。

If you use this example be careful not leaving it running or you will incur into costs as you will receive a lot messages coming from this PubSub topic.

这篇关于在Python中使用BigQuery接收器流式传输管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆