Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE"的“WriteToBigQuery/BigQuerySink"将json数据插入到bigquery [英] Google-cloud-dataflow: Failed to insert json data to bigquery through `WriteToBigQuery/BigQuerySink` with `BigQueryDisposition.WRITE_TRUNCATE`

查看:22
本文介绍了Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE"的“WriteToBigQuery/BigQuerySink"将json数据插入到bigquery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定如下数据集

{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}
...more type json data here

我尝试过滤这些 json 数据并使用 python sdk 将它们插入到 bigquery 中,如下所示

I try to filter those json data and insert them into bigquery with python sdk as following

ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'

class ParseJsonDoFn(beam.DoFn):
    B_TYPE = 'tag_B'
    def process(self, element):
        text_line = element.trip()
        data = json.loads(text_line)

        if data['type'] == 'ba':
            ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
            yield pvalue.TaggedOutput(self.B_TYPE, ba)

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                      dest='input',
                      default='data/path/data',
                      help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
      '--runner=DirectRunner',
      '--project=project-id',
      '--job_name=data-job',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromText(known_args.input)

        multiple_lines = (
            lines
            | 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
                                      ParseJsonDoFn.B_TYPE)))

        b_line = multiple_lines.tag_B
        (b_line
            | "output_b" >> beam.io.WriteToBigQuery(
                                          'temp.ba',
                                          schema = B_schema,
                                          write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        ))

调试日志显示

INFO:root:finish <DoOperation output_b/WriteToBigQuery output_tags=['out'], receivers=[ConsumerSet[output_b/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:root:Successfully wrote 2 rows.

看起来type:ba 的那两个数据被插入到bigquery 表temp.ba 中.但是,我运行

It seems those two data with type:ba was inserted into bigquery table temp.ba. However, I run

select * from `temp.ba`  limit 100;

此表中没有数据temp.ba.

我的代码有问题还是我遗漏了什么?

Is there anything wrong with my codes or anything am I missing?

更新:

谢谢@Eric Sc​​hmidt 的回答,我知道初始数据可能会有一些滞后.但是,运行上述脚本 5 分钟后,表中还没有没有数据.

Thanks @Eric Schmidt answer, I know there could be some lag for initial data. However, after 5 minutes after running the above script, there is no data in the table yet.

当我尝试删除 BigQuerySink

    | "output_b" >> beam.io.Write(
                      beam.io.BigQuerySink(
                          table = 'ba',
                          dataset = 'temp',
                          project = 'project-id',
                          schema = ba_schema,
                          #write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        )
                    ))

可以立即找到这两个记录.

Those two records could be found immediately.

而表信息为

也许我还没有理解初始数据可用性滞后的含义.有人可以给我更多信息吗?

Maybe I does not catch the meaning of initial data availability lag yet. Could someone give me more information?

推荐答案

需要考虑的两点:

1) 直接(本地)运行器使用流式插入.存在初始数据可用性滞后请参阅此帖子.

1) The Direct (local) runner uses streaming inserts. There is an initial data availability lag see this post.

2) 确保您完全符合您要流向的项目.使用 BigQuerySink()project="foo", dataset="bar", table="biz".

2) Make sure you fully qualify the project you are streaming into. With BigQuerySink() project="foo", dataset="bar", table="biz".

我怀疑您的问题是 #1.

I suspect your issue is #1.

这篇关于Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE"的“WriteToBigQuery/BigQuerySink"将json数据插入到bigquery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆