到BigQuery的数据流GCS-每个输入如何输出多行? [英] Dataflow GCS to BigQuery - How to output multiple rows per input?
问题描述
当前,我使用的是Google提供的gcs-text-to-bigquery模板,并提供了一个转换函数来转换我的jsonl文件. jsonl非常嵌套,我希望能够通过执行一些转换来在换行符分隔的json的每一行中输出多行.
Currently I am using the gcs-text-to-bigquery google provided template and feeding in a transform function to transform my jsonl file. The jsonl is pretty nested and i wanted to be able to output multiple rows per one row of the newline delimited json by doing some transforms.
例如:
{'state': 'FL', 'metropolitan_counties':[{'name': 'miami dade', 'population':100000}, {'name': 'county2', 'population':100000}…], 'rural_counties':{'name': 'county1', 'population':100000}, {'name': 'county2', 'population':100000}….{}], 'total_state_pop':10000000,….}
显然,将有2个以上的县,并且每个州都将具有以下任一行.我老板想要的输出是:
There will obviously be more counties than 2 and each state will have one of these lines. The output my boss wants is:
当我执行从gcs到bq的文本转换时,最终每个州只得到一行(所以我将从佛罗里达州获得迈阿密戴德县,然后在我的转换中为第一个州转换的第一个县).我读了一点,我认为这是因为模板中的映射期望每个jsonline一个输出.似乎我可以做一个pardo(DoFn?)不确定是什么,还是在python中使用beam.Map有类似的选项.转换中有一些业务逻辑(现在,大约有25行代码,因为json的列比我显示的要多,但这些列很简单).
When i do the gcs-to-bq text transform, i end up only getting one line per state (so I'll get miami dade county from FL, and then whatever the first county is in my transform for the next state). I read a little bit and i think this is because of the mapping in the template that expects one output per jsonline. It seems I can do a pardo(DoFn ?) not sure what that is, or there is a similar option with beam.Map in python. There is some business logic in the transforms (right now it's about 25 lines of code as the json has more columns than i showed but those are pretty simple).
对此有何建议?数据将在今晚/明天进入,并且BQ表中将有成千上万的行.
Any suggestions on this? data is coming in tonight/tomorrow, and there will be hundreds of thousands of rows in a BQ table.
我正在使用的模板当前在java中,但是我可以很容易地将其转换为python,因为在python中有很多在线示例.我了解python更好,并且考虑到不同的类型(有时字段可以为null),我认为它更容易,而且鉴于我看到的示例看起来更简单,因此似乎也没有那么令人畏惧,但是,对其中任何一个都可以开放
the template i am using is currently in java, but i can translate it to python pretty easily as there are a lot of examples online in python. i know python better and i think its easier given the different types (sometimes a field can be null) and it seems less daunting given the examples i saw look simpler, however, open to either
推荐答案
在Python中解决该问题有些简单.这是一种可能性(尚未完全测试):
Solving that in Python is somewhat straightforward. Here's one possibility (not fully tested):
from __future__ import absolute_import
import ast
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/service_account.json'
pipeline_args = [
'--job_name=test'
]
pipeline_options = PipelineOptions(pipeline_args)
def jsonify(element):
return ast.literal_eval(element)
def unnest(element):
state = element.get('state')
state_pop = element.get('total_state_pop')
if state is None or state_pop is None:
return
for type_ in ['metropolitan_counties', 'rural_counties']:
for e in element.get(type_, []):
name = e.get('name')
pop = e.get('population')
county_type = (
'Metropolitan' if type_ == 'metropolitan_counties' else 'Rural'
)
if name is None or pop is None:
continue
yield {
'State': state,
'County_Type': county_type,
'County_Name': name,
'County_Pop': pop,
'State_Pop': state_pop
}
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText('gs://url to file')
schema = 'State:STRING,County_Type:STRING,County_Name:STRING,County_Pop:INTEGER,State_Pop:INTEGER'
data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
)
仅当您使用批处理数据时,此操作才会成功.如果您有流数据,则只需将beam.io.Write(beam.io.BigquerySink(...))
更改为beam.io.WriteToBigQuery
.
This will only succeed if you are working with batch data. If you have streaming data then just change beam.io.Write(beam.io.BigquerySink(...))
to beam.io.WriteToBigQuery
.
这篇关于到BigQuery的数据流GCS-每个输入如何输出多行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!