Dataflow GCS 到 BigQuery - 如何为每个输入输出多行? [英] Dataflow GCS to BigQuery - How to output multiple rows per input?

查看:22
本文介绍了Dataflow GCS 到 BigQuery - 如何为每个输入输出多行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前我正在使用谷歌提供的 gcs-text-to-bigquery 模板并输入转换函数来转换我的 jsonl 文件.jsonl 是非常嵌套的,我希望能够通过进行一些转换来为换行符分隔的 json 的每一行输出多行.

Currently I am using the gcs-text-to-bigquery google provided template and feeding in a transform function to transform my jsonl file. The jsonl is pretty nested and i wanted to be able to output multiple rows per one row of the newline delimited json by doing some transforms.

例如:

{'state': 'FL', 'metropolitan_counties':[{'name': 'miami dade', 'population':100000}, {'name': 'county2', 'population':100000}…], 'rural_counties':{'name': 'county1', 'population':100000}, {'name': 'county2', 'population':100000}….{}], 'total_state_pop':10000000,….}

显然县的数量会多于 2 个,并且每个州都有这样的一条线.我老板想要的输出是:

There will obviously be more counties than 2 and each state will have one of these lines. The output my boss wants is:

当我进行 gcs-to-bq 文本转换时,我最终每个州只得到一行(所以我将从佛罗里达州得到 miamidade 县,然后无论第一个县在我的下一个州的转换中).我读了一点,我认为这是因为模板中的映射期望每个 jsonline 有一个输出.似乎我可以做一个 pardo(DoFn ?) 不知道那是什么,或者在 python 中有一个与 beam.Map 类似的选项.转换中有一些业务逻辑(现在大约有 25 行代码,因为 json 的列比我显示的多,但这些非常简单).

When i do the gcs-to-bq text transform, i end up only getting one line per state (so I'll get miami dade county from FL, and then whatever the first county is in my transform for the next state). I read a little bit and i think this is because of the mapping in the template that expects one output per jsonline. It seems I can do a pardo(DoFn ?) not sure what that is, or there is a similar option with beam.Map in python. There is some business logic in the transforms (right now it's about 25 lines of code as the json has more columns than i showed but those are pretty simple).

对此有什么建议吗?今晚/明天有数据进来,一个BQ表会有几十万行.

Any suggestions on this? data is coming in tonight/tomorrow, and there will be hundreds of thousands of rows in a BQ table.

我正在使用的模板目前是在 Java 中,但我可以很容易地将它翻译成 python,因为在 python 中有很多在线示例.我更了解 python 并且我认为它更容易考虑到不同的类型(有时一个字段可以为空)并且考虑到我看到的示例看起来更简单,它似乎不那么令人生畏,但是,对任何一个开放

the template i am using is currently in java, but i can translate it to python pretty easily as there are a lot of examples online in python. i know python better and i think its easier given the different types (sometimes a field can be null) and it seems less daunting given the examples i saw look simpler, however, open to either

推荐答案

在 Python 中解决这个问题有些简单.这是一种可能性(未完全测试):

Solving that in Python is somewhat straightforward. Here's one possibility (not fully tested):

from __future__ import absolute_import                                                               

import ast                                                                      

import apache_beam as beam                                                      
from apache_beam.io import ReadFromText                                            
from apache_beam.io import WriteToText                                             

from apache_beam.options.pipeline_options import PipelineOptions                   

import os                                                                       
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/service_account.json'      

pipeline_args = [                                                                  
    '--job_name=test'                                                              
]                                                                                  

pipeline_options = PipelineOptions(pipeline_args)                                  


def jsonify(element):                                                              
    return ast.literal_eval(element)                                               


def unnest(element):                                                            
    state = element.get('state')                                                
    state_pop = element.get('total_state_pop')                                  
    if state is None or state_pop is None:                                                   
        return                                                                  
    for type_ in ['metropolitan_counties', 'rural_counties']:                   
        for e in element.get(type_, []):                                        
            name = e.get('name')                                                
            pop = e.get('population')                                           
            county_type = (                                                     
                'Metropolitan' if type_ == 'metropolitan_counties' else 'Rural' 
            )                                                                   
            if name is None or pop is None:                                     
                continue                                                        
            yield {                                                             
                'State': state,                                                 
                'County_Type': county_type,                                     
                'County_Name': name,                                            
                'County_Pop': pop,                                              
                'State_Pop': state_pop                                          
            }

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        

    schema = 'State:STRING,County_Type:STRING,County_Name:STRING,County_Pop:INTEGER,State_Pop:INTEGER'                                                                      

    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                                       
    )

这只会在您处理批处理数据时成功.如果您有流数据,那么只需将 beam.io.Write(beam.io.BigquerySink(...)) 更改为 beam.io.WriteToBigQuery.

This will only succeed if you are working with batch data. If you have streaming data then just change beam.io.Write(beam.io.BigquerySink(...)) to beam.io.WriteToBigQuery.

这篇关于Dataflow GCS 到 BigQuery - 如何为每个输入输出多行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆