使用Python SDK进行数据流流传输:将PubSub消息转换为BigQuery输出 [英] Dataflow Streaming using Python SDK: Transform for PubSub Messages to BigQuery Output

查看:124
本文介绍了使用Python SDK进行数据流流传输:将PubSub消息转换为BigQuery输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用数据流读取pubsub消息并将其写入大查询. Google团队为我提供了alpha访问权限,并且可以使提供的示例正常工作,但是现在我需要将其应用于我的方案.

I am attempting to use dataflow to read a pubsub message and write it to big query. I was given alpha access by the Google team and have gotten the provided examples working but now I need to apply it to my scenario.

Pubsub有效负载:

Pubsub payload:

Message {
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
    attributes: {}
}

大查询模式:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',

我的目标是简单地读取消息有效负载并将其插入到bigquery中.我正竭尽全力解决转换问题,以及如何将键/值映射到大型查询架构.

My goal is to simply read the message payload and insert into bigquery. I am struggling with getting my head around the transformations and how should I map the key/values to the big query schema.

我对此非常陌生,因此不胜感激.

I am very new to this so any help is appreciated.

当前代码: https://codeshare.io/ayqX8w

谢谢!

推荐答案

我能够通过定义将其加载到json对象中的函数来成功解析pubsub字符串(请参阅parse_pubsub()).我遇到的一个奇怪的问题是我无法在全局范围内导入json.我收到"NameError:未定义全局名称'json'"错误.我必须在函数中导入json.

I was able to successfully parse the pubsub string by defining a function that loads it into a json object (see parse_pubsub()). One weird issue I encountered was that I was not able to import json at the global scope. I was receiving "NameError: global name 'json' is not defined" errors. I had to import json within the function.

请在下面查看我的工作代码:

See my working code below:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window

'''Normalize pubsub string to json object'''
# Lines look like this:
  # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['mac']), (record['status']), (record['datetime'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  parser.add_argument(
      '--output_table', required=True,
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

这篇关于使用Python SDK进行数据流流传输:将PubSub消息转换为BigQuery输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆