使用数据流管道(python)将多个 Json zip 文件从 GCS 加载到 BigQuery [英] Load multiple Json zip file from GCS to BigQuery using Dataflow pipeline (python)

查看:46
本文介绍了使用数据流管道(python)将多个 Json zip 文件从 GCS 加载到 BigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我完全是 Dataflow 的新手,也是一名天真的程序员.我正在寻找设计用 python 编写的数据流管道的帮助,以读取存储在 GCS 上的多部分压缩 Json 文件以加载到 BigQuery.源无法为我们提供文件/表的架构.所以,我正在寻找一个自动检测选项.类似于以下内容:

job_config = bigquery.LoadJobConfig(自动检测=真,source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON)

我不需要任何转换.只是想将 json 加载到 BQ.

我在谷歌上找不到任何示例模板来读取带有自动检测功能的 json.zip 文件并写入 BQ.有人可以帮助我提供上述要求的模板或语法或我需要考虑的提示和要点吗?

解决方案

这是一个示例 Python Beam 可执行代码和示例原始数据.

<预><代码>#------------导入库--------------#导入 apache_beam 作为梁从 apache_beam 导入窗口从 apache_beam.options.pipeline_options 导入 PipelineOptions, StandardOptions导入操作系统、系统、时间导入参数解析导入日志从 apache_beam.options.pipeline_options 导入 SetupOptions从日期时间导入日期时间#------------设置BQ参数--------------## 替换为项目 ID项目 = 'xxxxxxxxxxxx'input='gs://FILE-Path'#plitting of 记录----------------------#类 Transaction_ECOM(beam.DoFn):定义过程(自我,元素):logging.info(元素)结果 = json.loads(元素)data_bkt = result.get('_bkt','null')data_cd=result.get('_cd','null')data_indextime=result.get('_indextime','0')data_kv=result.get('_kv','null')data_raw=result['_raw']data_raw1=data_raw.replace("\n", "")data_serial=result.get('_serial','null')data_si = str(result.get('_si','null'))data_sourcetype =result.get('_sourcetype','null')data_subsecond = result.get('_subsecond','null')data_time=result.get('_time','null')data_host=result.get('host','null')data_index=result.get('index','null')data_linecount=result.get('linecount','null')data_source=result.get('source','null')data_sourcetype1=result.get('sourcetype','null')data_splunk_server=result.get('splunk_server','null')return [{datetime_indextime": time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(int(data_indextime))), "_bkt": data_bkt, ";_cd":data_cd,_indextime":data_indextime,_kv":data_kv,_raw":data_raw1,_serial":data_serial,_si":data_si,_sourcetype":data_source_subsecond":data_subsecond,_time":data_time,host":data_host,index":data_index,linecount":data_linecount,source":data_source,sourcetype":data_sourcetype1,splunk";: data_splunk_server}]def run(argv=None, save_main_session=True):解析器 = argparse.ArgumentParser()known_args, pipeline_args = parser.parse_known_args(argv)管道选项 = 管道选项(管道参数)pipeline_options.view_as(SetupOptions).save_main_session = save_main_sessionp1 = beam.Pipeline(选项=pipeline_options)数据加载 = (p1|'从文件中读取'>>beam.io.ReadFromText(输入,skip_header_lines=0))project_id = "xxxxxxxxxxx";dataset_id = 'test123'table_schema_ECOM = ('datetime_indextime:DATETIME, _bkt:STRING, _cd:STRING, _indextime:STRING, _kv:STRING, _raw:STRING, _serial:STRING, _si:STRING, _sourcetype:STRING, _subsecond:STRING, host_time:STRING,STRING,索引:STRING,行数:STRING,源:STRING,源类型:STRING,splunk_server:STRING')# 坚持使用 BigQuery# Wr​​iteToBigQuery 接受数据作为 JSON 对象列表#---------------------索引 = ITF----------------------------------------------------------------------------------------------------------------结果 = (数据加载|'清洁-ITF' >>梁.ParDo(Transaction_ECOM())|'写-ITF' >>beam.io.WriteToBigQuery(表='CFF_ABC',数据集=数据集_id,项目=project_id,架构=table_schema_ECOM,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))结果 = p1.run()result.wait_until_finish()如果 __name__ == '__main__':path_service_account = '/home/vibhg/Splunk/CFF/xxxxxxxxxxx-abcder125.json'os.environ[GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account跑()

它有几个额外的库,所以忽略它.

可以存储在 GCS 上的示例数据,如下所示:-

{_bkt":A1E8-A5370FECA146"、_cd":412:140787687"、_indextime":1611584940"、1", "_raw": "2021-01-25 14:28:59,126 INFO [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\374941817\",位置=\NA\", [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial":0",_si":[9ttr-bfc-gcp-europe-besti1",itf"],_sourcetype":BBClog",_subsecond":.126",";_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount":1"、源":/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log"、源类型":BBClog"、web_server":9ttr-bfc-gcp-europe-besti1"}{_bkt":itf~412~2EE5428B-7CEA-4C49-A1E8-A5370FECA146"、_cd":412:140787687"、_indextime"、14"、19"141",_raw":2021-01-25 14:28:59,126 INFO [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\BsCall\",fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\"374941817\", Locality=\"NA\", [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial":0",_si":[9ttr-bfc-gcp-europe-besti1",itf"],_sourcetype":BBClog",_subsecond":.126",";_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount":1"、源":/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log"、源类型":BBClog"、web_server":9ttr-bfc-gcp-europe-besti1"}{_bkt":9-A1E8-A5370FECA146"、_cd":412:140787671"、_indextime":1611584940"、_k";: "2021-01-25 14:28:58,659 INFO [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\BsCreateOrderV2\", BsExecutionTime=\00:00:01.568\", OrderNo=\374942155\", CountryCode=\US\", ClientSystem=\"owfe-webapp\", [fulName=\"EBCMFSSALES02\"], [bsName=\"BsCreateOrderV2\"], [userId=\"s-salja1-u-irssemal\"], [userIdRegion=\"NA\"], [msgId=\"6652311fece28966\"], [msgIdSeq=\"25\"], [originator=\"SellingApi\"] ", "_serial": ";1"、_si":[9ttr-bfc-gcp-europe-besti1"、itf"]、_sourcetype":BBClog"、_subsecond":.659"、";_time": "2021-01-25 14:28:58.659 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount":1"、源":/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log"、源类型":BBClog"、web_server":9ttr-bfc-gcp-europe-besti1"}

您可以使用以下命令执行脚本:-

python script.py --region europe-west1 --project xxxxxxx --temp_location gs://temp/temp --runner DataflowRunner --job_name 名称

它可能对你有帮助.

I am completely new to Dataflow and naïve programmer. I am looking for help in designing a dataflow pipeline written in python to read multi parted compressed Json files stored on GCS to load to BigQuery. The source couldn't provide us with the Schema of the file/table. So, I am looking for an autodetect option. something like below:

job_config = bigquery.LoadJobConfig(
    autodetect=True,
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)

I don't require any transformations. Just wanted to load json to BQ.

I couldn't find any sample template on google that reads a json.zip file with auto detect and writes to BQ. Can someone help me with a template or syntax for above requirement or tips and points that I need to consider?

解决方案

Here is a sample Python Beam executable code and sample raw data.


#------------Import Lib-----------------------#
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os, sys, time
import argparse
import logging
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxxxxxxxxx'
input='gs://FILE-Path'
#plitting Of Records----------------------#

class Transaction_ECOM(beam.DoFn):
    def process(self, element):
        logging.info(element)

        result = json.loads(element)
        data_bkt = result.get('_bkt','null')
        data_cd=result.get('_cd','null')
        data_indextime=result.get('_indextime','0')
        data_kv=result.get('_kv','null')
        data_raw=result['_raw']
        data_raw1=data_raw.replace("\n", "")
        data_serial=result.get('_serial','null')
        data_si = str(result.get('_si','null'))
        data_sourcetype =result.get('_sourcetype','null')
        data_subsecond = result.get('_subsecond','null')
        data_time=result.get('_time','null')
        data_host=result.get('host','null')
        data_index=result.get('index','null')
        data_linecount=result.get('linecount','null')
        data_source=result.get('source','null')
        data_sourcetype1=result.get('sourcetype','null')
        data_splunk_server=result.get('splunk_server','null')

        return [{"datetime_indextime": time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(int(data_indextime))), "_bkt": data_bkt, "_cd": data_cd,  "_indextime": data_indextime,  "_kv": data_kv,  "_raw": data_raw1,  "_serial": data_serial,  "_si": data_si, "_sourcetype": data_sourcetype, "_subsecond": data_subsecond, "_time": data_time, "host": data_host, "index": data_index, "linecount": data_linecount, "source": data_source, "sourcetype": data_sourcetype1, "splunk_server": data_splunk_server}]



def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = parser.parse_known_args(argv)


    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)



    data_loading = (
        p1
        |'Read from File' >> beam.io.ReadFromText(input,skip_header_lines=0)


    )


    project_id = "xxxxxxxxxxx"
    dataset_id = 'test123'
    table_schema_ECOM = ('datetime_indextime:DATETIME, _bkt:STRING, _cd:STRING, _indextime:STRING, _kv:STRING, _raw:STRING, _serial:STRING, _si:STRING, _sourcetype:STRING, _subsecond:STRING, _time:STRING, host:STRING, index:STRING, linecount:STRING, source:STRING, sourcetype:STRING, splunk_server:STRING')

        # Persist to BigQuery
        # WriteToBigQuery accepts the data as list of JSON objects

#---------------------Index = ITF----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Clean-ITF' >> beam.ParDo(Transaction_ECOM())
        | 'Write-ITF' >> beam.io.WriteToBigQuery(
                                                    table='CFF_ABC',
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_ECOM,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                    ))

    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  path_service_account = '/home/vibhg/Splunk/CFF/xxxxxxxxxxx-abcder125.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()


It has few additional libraries so just ignore it.

Sample data which can be stored on GCS, that is given below:-

{"_bkt": "A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\"374941817\", Locality=\"NA\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
{"_bkt": "itf~412~2EE5428B-7CEA-4C49-A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\"374941817\", Locality=\"NA\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
{"_bkt": "9-A1E8-A5370FECA146", "_cd": "412:140787671", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:58,659 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsCreateOrderV2\", BsExecutionTime=\"00:00:01.568\", OrderNo=\"374942155\", CountryCode=\"US\", ClientSystem=\"owfe-webapp\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsCreateOrderV2\"], [userId=\"s-salja1-u-irssemal\"], [userIdRegion=\"NA\"], [msgId=\"6652311fece28966\"], [msgIdSeq=\"25\"], [originator=\"SellingApi\"] ", "_serial": "1", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".659", "_time": "2021-01-25 14:28:58.659 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}

You can execute script with following command :-

python script.py --region europe-west1 --project xxxxxxx --temp_location gs://temp/temp --runner DataflowRunner --job_name name

It may help you.

这篇关于使用数据流管道(python)将多个 Json zip 文件从 GCS 加载到 BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆