如何使用python将字典写入Dataflow中的Bigquery [英] How to write dictionaries to Bigquery in Dataflow using python

查看:123
本文介绍了如何使用python将字典写入Dataflow中的Bigquery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从GCP存储中的csv中读取内容,将其转换为字典,然后按如下所示写入Bigquery表:

I am trying to read from a csv from in GCP Storage, converting that into dictionaries and then write to a Bigquery table as follows:

p | ReadFromText("gs://bucket/file.csv") 
  | (beam.ParDo(BuildAdsRecordFn()))
  | WriteToBigQuery('ads_table',dataset='dds',project='doubleclick-2',schema=ads_schema)

其中:'doubleclick-2'和'dds'是现有的项目和数据集,ads_schema的定义如下:

where: 'doubleclick-2' and 'dds' are existing project and dataset, ads_schema is defined as follows:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

BuildAdsRecordFn()的定义如下:

BuildAdsRecordFn() is defined as follows:

class AdsRecord:
  dict = {}

  def __init__(self, line):
    record = line.split(",")
    self.dict['Advertiser_ID'] = record[0]
    self.dict['Campaign_ID'] = record[1]
    self.dict['Ad_ID'] = record[2]
    self.dict['Ad_Name'] = record[3]
    self.dict['Click_through_URL'] = record[4]
    self.dict['Ad_Type'] = record[5]


class BuildAdsRecordFn(beam.DoFn):
  def __init__(self):
    super(BuildAdsRecordFn, self).__init__()

  def process(self, element):
    text_line = element.strip()
    ads_record = AdsRecord(text_line).dict
    return ads_record

但是,当我运行管道时,出现以下错误:

However, when I run the pipeline, I got the following error:

"dataflow_job_18146703755411620105-B" failed., (6c011965a92e74fa): BigQuery job "dataflow_job_18146703755411620105-B" in project "doubleclick-2" finished with error(s): errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0: Value encountered without start of object

这是我使用的样本测试数据:

Here is the sample testing data I used:

100001,1000011,10000111,ut,https://bloomberg.com/aliquam/lacus/morbi.xml,Brand-neutral
100001,1000011,10000112,eu,http://weebly.com/sed/vel/enim/sit.jsp,Dynamic Click

我对Dataflow和python都是新手,所以无法弄清楚上面代码中可能出什么问题.非常感谢您的帮助!

I'm new to both Dataflow and python so could not figure out what could be wrong in the above code. Greatly appreciate any help!

推荐答案

我刚刚实现了您的代码,但它也无法正常运行,但是出现了另一条消息错误(例如您无法返回ParDo)的结果.

I just implemented your code and it didn't work as well, but I got a different message error (something like "you can't return a dict as the result of a ParDo").

这段代码对我来说正常工作,请注意,不仅我没有使用class属性dict,而且现在还返回了一个列表:

This code worked normally for me, notice not only I'm not using the class attribute dict as well as now a list is returned:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

class BuildAdsRecordFn(beam.DoFn):
    def __init__(self):
      super(BuildAdsRecordFn, self).__init__()

    def process(self, element):
      text_line = element.strip()
      ads_record = self.process_row(element)      
      return ads_record

    def process_row(self, row):
        dict_ = {}

        record = row.split(",")
        dict_['Advertiser_ID'] = int(record[0]) if record[0] else None
        dict_['Campaign_ID'] = int(record[1]) if record[1] else None
        dict_['Ad_ID'] = int(record[2]) if record[2] else None
        dict_['Ad_Name'] = record[3]
        dict_['Click_through_URL'] = record[4]
        dict_['Ad_Type'] = record[5]
        return [dict_]

with beam.Pipeline() as p:

    (p | ReadFromText("gs://bucket/file.csv")
       | beam.Filter(lambda x: x[0] != 'A')
       | (beam.ParDo(BuildAdsRecordFn()))
       | WriteToBigQuery('ads_table', dataset='dds',
           project='doubleclick-2', schema=ads_schema))
      #| WriteToText('test.csv'))

这是我模拟的数据:

Advertiser_ID,Campaign_ID,Ad_ID,Ad_Name,Click_through_URL,Ad_Type
1,1,1,name of ad,www.url.com,sales
1,1,2,name of ad2,www.url2.com,sales with sales

我还过滤了我在文件中创建的标题行(在Filter操作中),如果您没有标题,则没有必要

I also filtered out the header line that I created in my file (in the Filter operation), if you don't have a header then this is not necessary

这篇关于如何使用python将字典写入Dataflow中的Bigquery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆