Spark:保存按“虚拟"分区的DataFrame.柱子 [英] Spark: save DataFrame partitioned by "virtual" column

查看:90
本文介绍了Spark:保存按“虚拟"分区的DataFrame.柱子的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用PySpark进行经典的ETL工作(加载数据集,对其进行处理,保存),并希望将我的数据框另存为虚拟"列所分区的文件/目录;我所说的虚拟"是指我有一个Timestamp列,它是一个包含ISO 8601编码日期的字符串,我想按年/月/日划分;但是我实际上在DataFrame中没有Year,Month或Day列;我有一个可以从中派生这些列的时间戳,但是我不希望我的resultat项将这些列之一序列化.

I'm using PySpark to do classic ETL job (load dataset, process it, save it) and want to save my Dataframe as files/directory partitioned by a "virtual" column; what I mean by "virtual" is that I have a column Timestamp which is a string containing an ISO 8601 encoded date, and I'd want to partition by Year / Month / Day; but I don't actually have either a Year, Month or Day column in the DataFrame; I have this Timestamp from which I can derive these columns though, but I don't want my resultat items to have one of these columns serialized.

将DataFrame保存到磁盘后生成的文件结构应类似于:

File structure resulting from saving the DataFrame to disk should look like:

/ 
    year=2016/
        month=01/
            day=01/
                part-****.gz

是否可以使用Spark/Pyspark来做我想做的事?

Is there a way to do what I want with Spark / Pyspark ?

推荐答案

用于分区的列不包含在序列化数据本身中.例如,如果您这样创建DataFrame:

Columns which are used for partitioning are not included in the serialized data itself. For example if you create DataFrame like this:

df = sc.parallelize([
    (1, "foo", 2.0, "2016-02-16"),
    (2, "bar", 3.0, "2016-02-16")
]).toDF(["id", "x", "y", "date"])

并编写如下:

import tempfile
from pyspark.sql.functions import col, dayofmonth, month, year
outdir = tempfile.mktemp()

dt = col("date").cast("date")
fname = [(year, "year"), (month, "month"), (dayofmonth, "day")]
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname]

(df
    .select(*exprs)
    .write
    .partitionBy(*(name for _, name in fname))
    .format("json")
    .save(outdir))

单个文件将不包含分区列:

individual files won't contain partition columns:

import os

(sqlContext.read
    .json(os.path.join(outdir, "year=2016/month=2/day=16/"))
    .printSchema())

## root
##  |-- date: string (nullable = true)
##  |-- id: long (nullable = true)
##  |-- x: string (nullable = true)
##  |-- y: double (nullable = true)

分区数据仅存储在目录结构中,而不在序列化文件中重复.仅当您读取完整或部分目录树时,它才会被附加:

Partitioning data is stored only in a directory structure and not duplicated in serialized files. It will be attached only when your read complete or partial directory tree:

sqlContext.read.json(outdir).printSchema()

## root
##  |-- date: string (nullable = true)
##  |-- id: long (nullable = true)
##  |-- x: string (nullable = true)
##  |-- y: double (nullable = true)
##  |-- year: integer (nullable = true)
##  |-- month: integer (nullable = true)
##  |-- day: integer (nullable = true)

sqlContext.read.json(os.path.join(outdir, "year=2016/month=2/")).printSchema()

## root
##  |-- date: string (nullable = true)
##  |-- id: long (nullable = true)
##  |-- x: string (nullable = true)
##  |-- y: double (nullable = true)
##  |-- day: integer (nullable = true)

这篇关于Spark:保存按“虚拟"分区的DataFrame.柱子的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆