Apache Beam作为参数写入BigQuery表和架构 [英] Apache Beam write to BigQuery table and schema as params

查看:110
本文介绍了Apache Beam作为参数写入BigQuery表和架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将Python SDK用于Apache Beam.数据表和模式的值在PCollection中.这是我从PubSub中读取的消息:

I'm using Python SDK for Apache Beam. The values of the datatable and the schema are in the PCollection. This is the message I read from the PubSub:

{"DEVICE":"rms005_m1","DATESTAMP":"2020-05-29 20:54:26.733 UTC","SINUMERIK__x_position":69.54199981689453,"SINUMERIK__y_position":104.31400299072266,"SINUMERIK__z_position":139.0850067138672}

然后我想使用json消息中的值将其写入BigQuery,其中lambda函数用于数据表,而该函数用于模式:

Then I want to write it to BigQuery using the values in the json message with the lambda function for the datatable and this function for the schema:

def set_schema(data):
    list = []
    for name in data:
        if name == 'STATUS' or name == 'DEVICE':
            type = 'STRING'
        elif name == 'DATESTAMP':
            type = 'TIMESTAMP'
        else:
            type = 'FLOAT'
        list.append(name + ':' + type)
    schema = ",".join(list)
    return schema

data = (p
        | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=topic)
        | "Parse json" >> beam.Map(json_parse)
        | "Write to BQ" >> beam.io.WriteToBigQuery(
            table='project:dataset{datatable}__opdata'.format(datatable = lambda element: element["DEVICE"]),
            schema=set_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
       )

执行它时出现此错误:

ValueError: Expected a table reference (PROJECT:DATASET.TABLE or DATASET.TABLE) instead of project:dataset.<function <lambda> at 0x7fa0dc378710>__opdata

如何在PTransform中将PCollection的值用作变量?

How can I use the values of the PCollection as variables in the PTransform?

推荐答案

您必须将函数传递到表中.试试这个,代替:

You have to pass a function into table. Try this, instead:

| "Write to BQ" >> beam.io.WriteToBigQuery(
            table=lambda element: 'project:dataset{datatable}__opdata'.format(datatable = element["DEVICE"]),
            schema=set_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )

这篇关于Apache Beam作为参数写入BigQuery表和架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆