Spark 在 Python/PySpark 中复制数据帧列的最佳实践? [英] Spark copying dataframe columns best practice in Python/PySpark?

查看:76
本文介绍了Spark 在 Python/PySpark 中复制数据帧列的最佳实践?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是针对使用 Spark 2.3.2 的 Python/PySpark.我正在寻找使用 Python/PySpark 将一个数据框的列复制到另一个数据框的最佳实践方法,用于 10+ 十亿行的超大数据集(按年/月/日平均分区).每行有 120 列要转换/复制.输出数据帧将被写入,日期分区,到另一个 parquet 文件集.

This is for Python/PySpark using Spark 2.3.2. I am looking for best practice approach for copying columns of one data frame to another data frame using Python/PySpark for a very large data set of 10+ billion rows (partitioned by year/month/day, evenly). Each row has 120 columns to transform/copy. The output data frame will be written, date partitioned, into another parquet set of files.

示例架构是:输入 DFinput (colA, colB, colC) 和输出DFoutput(X, Y, Z)

我想将 DFInput 复制到 DFOutput 如下(colA => Z, colB => X, colC => Y).

I want to copy DFInput to DFOutput as follows (colA => Z, colB => X, colC => Y).

在 Python Spark 2.3+ 中执行此操作的最佳实践是什么?我应该对每列使用 DF.withColumn() 方法将源复制到目标列吗?考虑到每行有 110 多列要复制的数十亿行,这会表现良好吗?

What is the best practice to do this in Python Spark 2.3+ ? Should I use DF.withColumn() method for each column to copy source into destination columns? Will this perform well given billions of rows each with 110+ columns to copy?

谢谢

推荐答案

在 PySpark 中处理列映射的另一种方法是通过 dictionary.字典帮助您使用 key/value 结构将初始数据帧的列映射到最终数据帧的列,如下所示:

Another way for handling column mapping in PySpark is via dictionary. Dictionaries help you to map the columns of the initial dataframe into the columns of the final dataframe using the the key/value structure as shown below:

from pyspark.sql.functions import col

df = spark.createDataFrame([
  [1, "John", "2019-12-01 10:00:00"],
  [2, "Michael", "2019-12-01 11:00:00"],
  [2, "Michael", "2019-12-01 11:01:00"],
  [3, "Tom", "2019-11-13 20:00:00"],
  [3, "Tom", "2019-11-14 00:00:00"],
  [4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])


col_map = {"A":"Z", "B":"X", "C":"Y"}

df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()

# +---+-------+-------------------+
# |  Z|      X|                  Y|
# +---+-------+-------------------+
# |  1|   John|2019-12-01 10:00:00|
# |  2|Michael|2019-12-01 11:00:00|
# |  2|Michael|2019-12-01 11:01:00|
# |  3|    Tom|2019-11-13 20:00:00|
# |  3|    Tom|2019-11-14 00:00:00|
# |  4|   Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+

这里我们分别将 A、B、C 映射到 Z、X、Y.

Here we map A, B, C into Z, X, Y respectively.

如果你想要一个模块化的解决方案,你也可以把所有东西都放在一个函数中:

And if you want a modular solution you also put everything inside a function:

def transform_cols(mappings, df):
  return df.select(*[col(k).alias(mappings[k]) for k in mappings])

或者通过使用猴子补丁来扩展现有的功能DataFrame 类.将下一个代码放在您的 PySpark 代码之上(您也可以创建一个迷你库并在需要时将其包含在您的代码中):

Or even more modular by using monkey patching to extend the existing functionality of the DataFrame class. Place the next code on top of your PySpark code (you can also create a mini library and include it on your code when needed):

from pyspark.sql import DataFrame

def transform_cols(self, mappings):
  return self.select(*[col(k).alias(mappings[k]) for k in mappings])

DataFrame.transform = transform_cols

然后调用它:

df.transform(col_map).show()

PS:这可能是通过创建自己的库并通过 DataFrame 和猴子补丁(熟悉 C# 的扩展方法)公开它们来扩展 DataFrame 功能的便捷方式.

这篇关于Spark 在 Python/PySpark 中复制数据帧列的最佳实践?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆