AWS Glue Job-写入单个Parquet文件 [英] AWS Glue Job - Writing into single Parquet file

查看:71
本文介绍了AWS Glue Job-写入单个Parquet文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用分区在S3存储桶中收集JSON格式的数据.

I'm collecting JSON formatted datas in S3 Bucket with partition.

示例:

 s3://bucket/app-events/year=2019/month=9/day=30/0001.json
 s3://bucket/app-events/year=2019/month=9/day=30/0002.json
 s3://bucket/app-events/year=2019/month=9/day=30/0003.json

爬虫在 s3://bucket/app-events/中工作并创建表.

A crawler works in s3://bucket/app-events/ and creates a Table.

我想将这些JSON文件转换为单个Parquet文件,但是我的工作是为每个JSON文件创建一个新的Parquet.

I want to transform these JSON files into a single Parquet file but my job creates a new Parquet for each JSON file.

这是我在Python中的工作脚本:

Here is my job script in Python:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

print("-------------- Execute Script --------------\n")

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

print("-------------- JOB_NAME: " + args['JOB_NAME'] + " --------------\n")

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("-------------- Execute Finding Sources --------------\n")

## @type: DataSource
## @args: [database = "my-db", table_name = "year_2019", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my-db", table_name = "year_2019", transformation_ctx = "datasource0")

# datasource0 = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://bucket/app-events"], 'recurse':True, 'groupFiles': 'inPartition', 'groupSize': '1048576'}, format="json")
# Print out the count of found datasources
print("-------------- Sources Found: " + str(datasource0.count()) + "--------------\n")

## @type: ApplyMapping
## @args: [mapping = [("debug", "boolean", "debug", "boolean"), ("_id", "string", "_id", "string"), ("os", "string", "os", "string"), ("`data.language`", "string", "`data.language`", "string"), ("`data.ad_id`", "string", "`data.ad_id`", "string"), ("`data.ad_contenttype`", "string", "`data.ad_contenttype`", "string"), ("`data.ad_name`", "string", "`data.ad_name`", "string"), ("`data.shop_name`", "string", "`data.shop_name`", "string"), ("`data.shop_id`", "string", "`data.shop_id`", "string"), ("device_id", "string", "device_id", "string"), ("session_id", "string", "session_id", "string"), ("os_version", "string", "os_version", "string"), ("distinct_id", "string", "distinct_id", "string"), ("shop_id", "string", "shop_id", "string"), ("page", "string", "page", "string"), ("name", "string", "name", "string"), ("start_timestamp", "string", "start_timestamp", "string"), ("id", "string", "id", "string"), ("ip_address", "string", "ip_address", "string"), ("location", "string", "location", "string"), ("city", "string", "city", "string"), ("country", "string", "country", "string"), ("start_timestamp_unix", "int", "start_timestamp_unix", "int"), ("postal", "string", "postal", "string"), ("region", "string", "region", "string"), ("`data.entity_order`", "string", "`data.entity_order`", "string"), ("`data.entity_id`", "string", "`data.entity_id`", "string"), ("`data.entity_type`", "string", "`data.entity_type`", "string"), ("`data.entity_name`", "string", "`data.entity_name`", "string"), ("`data.entity_image`", "string", "`data.entity_image`", "string"), ("`data.feedbackform_id`", "string", "`data.feedbackform_id`", "string"), ("`data.feedbackform_question_count`", "string", "`data.feedbackform_question_count`", "string"), ("`data.feedbackform_name`", "string", "`data.feedbackform_name`", "string"), ("`data.shop_pincode`", "string", "`data.shop_pincode`", "string"), ("`data.entity_quantity`", "string", "`data.entity_quantity`", "string"), ("`data.entity_choice`", "string", "`data.entity_choice`", "string"), ("`data.entity_comment`", "string", "`data.entity_comment`", "string"), ("`data.entity_price`", "string", "`data.entity_price`", "string"), ("`data.old_language`", "string", "`data.old_language`", "string"), ("app", "string", "app", "string"), ("event", "string", "event", "string"), ("shop", "string", "shop", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string"), ("minute", "string", "minute", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("debug", "boolean", "debug", "boolean"), ("_id", "string", "_id", "string"), ("os", "string", "os", "string"), ("`data.language`", "string", "`data.language`", "string"), ("`data.ad_id`", "string", "`data.ad_id`", "string"), ("`data.ad_contenttype`", "string", "`data.ad_contenttype`", "string"), ("`data.ad_name`", "string", "`data.ad_name`", "string"), ("`data.shop_name`", "string", "`data.shop_name`", "string"), ("`data.shop_id`", "string", "`data.shop_id`", "string"), ("device_id", "string", "device_id", "string"), ("session_id", "string", "session_id", "string"), ("os_version", "string", "os_version", "string"), ("distinct_id", "string", "distinct_id", "string"), ("shop_id", "string", "shop_id", "string"), ("page", "string", "page", "string"), ("name", "string", "name", "string"), ("start_timestamp", "string", "start_timestamp", "string"), ("id", "string", "id", "string"), ("ip_address", "string", "ip_address", "string"), ("location", "string", "location", "string"), ("city", "string", "city", "string"), ("country", "string", "country", "string"), ("start_timestamp_unix", "int", "start_timestamp_unix", "int"), ("postal", "string", "postal", "string"), ("region", "string", "region", "string"), ("`data.entity_order`", "string", "`data.entity_order`", "string"), ("`data.entity_id`", "string", "`data.entity_id`", "string"), ("`data.entity_type`", "string", "`data.entity_type`", "string"), ("`data.entity_name`", "string", "`data.entity_name`", "string"), ("`data.entity_image`", "string", "`data.entity_image`", "string"), ("`data.feedbackform_id`", "string", "`data.feedbackform_id`", "string"), ("`data.feedbackform_question_count`", "string", "`data.feedbackform_question_count`", "string"), ("`data.feedbackform_name`", "string", "`data.feedbackform_name`", "string"), ("`data.shop_pincode`", "string", "`data.shop_pincode`", "string"), ("`data.entity_quantity`", "string", "`data.entity_quantity`", "string"), ("`data.entity_choice`", "string", "`data.entity_choice`", "string"), ("`data.entity_comment`", "string", "`data.entity_comment`", "string"), ("`data.entity_price`", "string", "`data.entity_price`", "string"), ("`data.old_language`", "string", "`data.old_language`", "string"), ("app", "string", "app", "string"), ("event", "string", "event", "string"), ("shop", "string", "shop", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string"), ("minute", "string", "minute", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://bucket/app-events-processed"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://bucket/app-events-processed/singles"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

我该如何实现?

推荐答案

AWS Glue基于Apache Spark,该技术在多个节点之间分区数据以实现高吞吐量.将数据写入基于文件的接收器(如Amazon S3)时,Glue将为每个分区写入一个单独的文件.要更改DynamicFrame中的分区数,您可以先将其转换为DataFrame,然后利用Apache Spark的分区功能.

AWS Glue is based on Apache Spark, which partitions data across multiple nodes to achieve high throughput. When writing data to a file-based sink like Amazon S3, Glue will write a separate file for each partition. To change the number of partitions in a DynamicFrame, you can first convert it into a DataFrame and then leverage Apache Spark's partitioning capabilities.

# Convert to a dataframe and partition based on "partition_col"
partitioned_dataframe = datasource0.toDF().repartition(1)

# Convert back to a DynamicFrame for further processing.
partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "partitioned_df")

参考资料: AWS胶水:操作方法

这篇关于AWS Glue Job-写入单个Parquet文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆