__HIVE_DEFAULT_PARTITION__ 作为胶水 ETL 作业中的分区值 [英] __HIVE_DEFAULT_PARTITION__ as partition value in glue ETL job

查看:149
本文介绍了__HIVE_DEFAULT_PARTITION__ 作为胶水 ETL 作业中的分区值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有通过胶水爬虫爬取的 CSV 数据,最终出现在一张表中.

I have CSV data that is crawled via a glue crawler, and ends up in one table.

我正在尝试运行 ETL 作业以将磁盘上的数据重新分区为日期列的某些组件.然后将 CSV 转换为镶木地板.

I'm trying to run an ETL job to re-partition the data on disk into some components of the date column. Then convert the CSV to parquet.

即我的数据中有一个名为date"的列,我想将数据分区为 s3 上的年、月、日分区.

i.e. I have a column named "date" in my data, and wanted to partition the data into year, month,day partitions on s3.

我能够转换为镶木地板并使其在序列号值(不同的列)上正确分区,但它将值__HIVE_DEFAULT_PARTITION__"放入与日期相关的所有值年、月和日中分区.

I am able to convert to parquet and get it to partition correctly on the serial number value (a different column), but it puts the value "__HIVE_DEFAULT_PARTITION__" in for all the values year, month, and day for the date related partitions.

我能够在其他列(如序列号)上进行分区,但年/月/日不在原始数据集中,因此我的方法是将日期列中的值创建为新列在数据集中并告诉 write_dynamic_frame 函数按列进行分区,但这不起作用.

I am able to partition on other columns (like serial-number), but the year/month/day are not in the original data set, and so my approach has been to create the values from the date column as new columns in the data set and tell the write_dynamic_frame function to partition by the columns, but that isn't working.

总的来说,我是 spark/pyspark 和胶水的新手,所以很有可能我遗漏了一些简单的东西.

I'm new to spark/pyspark and glue in general, so there is a very real possibility that i'm missing something simple.

感谢任何提供帮助的人.

Thanks to anyone who offers assistance.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

to_spark_df4 = dropnullfields3.toDF()

with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))

back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")


datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

结果是我在 s3 中的键最终看起来像这样:

The result is my keys in s3 end up looking like this:

serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet

更新:编辑格式

推荐答案

我从事的工作与您的非常相似.我希望你现在设法解决了它,但无论如何,这是解决你困境的方法:

I run a job very similar to yours. I hope you managed to solve it now, but anyway, here's the solution to your predicaments:

from pyspark.sql.functions import year, month, dayofmonth

###### rest of your code until ApplyMapping included ######

# add year, month & day columns, non zero-padded
df = df.toDF()
df = df.withColumn('year', year(df.date))\
       .withColumn('month', month(df.date))\
       .withColumn('day', dayofmonth(df.date))

附加说明:

如果您需要在想要选择日期范围的 Athena 上运行查询,我建议您避免使用嵌套分区(因此年 -> 月 -> 日),而是使用平面分区模式.这样做的原因是查询变得更容易编写.这是获取平面模式的python代码:

Additional note:

If you need to run queries on Athena where you want to select range of dates, I would suggest you avoid using nested partitioning (so year -> month -> day), but instead to use a flat partitioning schema. The reason for this, is that the query becomes much simpler to write. Here's the python code to get the flat schema:

from pyspark.sql.functions import date_format

###### rest of your code until ApplyMapping included ######

df = df.toDF()
df = df.withColumn('date_2', date_format(df.date, 'yyyy-MM-dd'))

# date_2 is because column "date" already exists,
# but we want the partitioning one to be in a string format.
# You can later drop the original column if you wish.

假设现在您要查询从 2020 年 3 月 15 日到 2020 年 4 月 3 日的数据.

Let's say now you want to query your data, from the 15 of March to the 3rd of April 2020.

这是基于您选择的分区架构的 SQL 查询.

Here's the SQL queries based on which partitioning schema you choose.

SELECT item_1, item_2
  FROM my_table
 WHERE year = 2020
   AND (
          (month = 3 AND day >= 15)
       OR (month = 4 AND day <= 3)
       )

平面架构

SELECT item_1, item_2
  FROM my_table
 WHERE date BETWEEN '2020-03-15' AND '2020-04-3'

此外,鉴于您的日期"列存储为字符串,您将能够使用 LIKE 运算符运行查询.

Also, given that your 'date' column is stored as a string, you'll be able to run queries using the LIKE operator.

例如,如果您想查询数据库中每个四月的所有数据,您可以这样做:

For example, if you want to query all data from each April in your database, you can do:

SELECT item_1, item_2
  FROM my_table
 WHERE date LIKE '%-04-%'

这篇关于__HIVE_DEFAULT_PARTITION__ 作为胶水 ETL 作业中的分区值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆