PySpark:具有不同列的 DataFrame 的动态联合 [英] PySpark: dynamic union of DataFrames with different columns
问题描述
考虑此处显示的数组.我有 3 组数组:
数组 1:
C1 C2 C31 2 39 5 6
数组 2:
C2 C3 C411 12 1310 15 16
数组 3:
C1 C4111 112110 115
我需要如下输出,输入我可以获得 C1、...、C4 的任何一个值,但在加入时我需要获得正确的值,如果该值不存在,则它应该为零.>
预期输出:
C1 C2 C3 C41 2 3 09 5 6 00 11 12 130 10 15 16111 0 0 112110 0 0 115
我已经编写了 pyspark 代码,但是我已经硬编码了新列及其 RAW 的值,我需要将以下代码转换为方法重载,以便我可以将此脚本用作自动脚本.我只需要使用 python/pyspark 而不是 Pandas.
导入pyspark从 pyspark 导入 SparkContext从 pyspark.sql.functions 导入点亮从 pyspark.sql 导入 SparkSessionsqlContext = pyspark.SQLContext(pyspark.SparkContext())df01 = sqlContext.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))df02 = sqlContext.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))df03 = sqlContext.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))df01_add = df01.withColumn("C4", lit(0)).select("c1","c2","c3","c4")df02_add = df02.withColumn("C1", lit(0)).select("c1","c2","c3","c4")df03_add = df03.withColumn("C2", lit(0)).withColumn("C3", lit(0)).select("c1","c2","c3","c4")df_uni = df01_add.union(df02_add).union(df03_add)df_uni.show()
方法重载示例:
班级学生:def ___Init__ (self,m1,m2):self.m1 = m1self.m2 = m2def sum(self,c1=None,c2=None,c3=None,c4=None):s = 0如果 c1!= None 和 c2 != None 和 c3 != None:s = c1+c2+c3elif c1 != None 和 c2 != None:s = c1+c2别的:s = c1返回打印(s1.sum(55,65,23))
可能有很多更好的方法可以做到这一点,但也许下面的内容对未来的任何人都有用.
from pyspark.sql import SparkSession从 pyspark.sql.functions 导入点亮spark = SparkSession.builder.appName("动态框架").getOrCreate()df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))数据帧 = [df01, df02, df03]# 创建所有列名的列表并对其进行排序cols = 设置()对于数据帧中的 df:对于 df.columns 中的 x:cols.add(x)cols = 排序(列)# 创建一个包含所有数据框的字典dfs = {}对于 enumerate(dataframes) 中的 i, d:new_name = 'df' + str(i) # 键的新名称,数据帧是值dfs[new_name] = d# 遍历所有列名.将缺失的列添加到数据框中(值为 0)对于列中的 x:如果 x 不在 d.columns 中:dfs[new_name] = dfs[new_name].withColumn(x, lit(0))dfs[new_name] = dfs[new_name].select(cols) # 使用'select'对列进行排序# 现在用一个循环(联合)把它放在一起result = dfs['df0'] # 取第一个数据帧,添加其他数据帧dfs_to_add = dfs.keys() # 字典中所有数据框的列表dfs_to_add.remove('df0') # 删除第一个,因为它已经在结果中对于 dfs_to_add 中的 x:结果 = result.union(dfs[x])结果.show()
输出:
+---+---+---+---+|C1|C2|C3|C4|+---+---+---+---+|1|2|3|0||9|5|6|0||0|11|12|13||0|10|15|16||111|0|0|112||110|0|0|115|+---+---+---+---+
Consider the arrays as shown here. I have 3 sets of array:
Array 1:
C1 C2 C3
1 2 3
9 5 6
Array 2:
C2 C3 C4
11 12 13
10 15 16
Array 3:
C1 C4
111 112
110 115
I need the output as following, the input I can get any one value for C1, ..., C4 but while joining I need to get correct values and if the value is not there then it should be zero.
Expected output:
C1 C2 C3 C4
1 2 3 0
9 5 6 0
0 11 12 13
0 10 15 16
111 0 0 112
110 0 0 115
I have written pyspark code but I have hardcoded the value for the new column and its RAW, I need to convert the below code to method overloading, so that I can use this script as automatic one. I need to use only python/pyspark not pandas.
import pyspark
from pyspark import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
sqlContext = pyspark.SQLContext(pyspark.SparkContext())
df01 = sqlContext.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
df02 = sqlContext.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
df03 = sqlContext.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))
df01_add = df01.withColumn("C4", lit(0)).select("c1","c2","c3","c4")
df02_add = df02.withColumn("C1", lit(0)).select("c1","c2","c3","c4")
df03_add = df03.withColumn("C2", lit(0)).withColumn("C3", lit(0)).select("c1","c2","c3","c4")
df_uni = df01_add.union(df02_add).union(df03_add)
df_uni.show()
Method Overloading Example:
class Student:
def ___Init__ (self,m1,m2):
self.m1 = m1
self.m2 = m2
def sum(self,c1=None,c2=None,c3=None,c4=None):
s = 0
if c1!= None and c2 != None and c3 != None:
s = c1+c2+c3
elif c1 != None and c2 != None:
s = c1+c2
else:
s = c1
return s
print(s1.sum(55,65,23))
There are probably plenty of better ways to do it, but maybe the below is useful to anyone in the future.
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
spark = SparkSession.builder
.appName("DynamicFrame")
.getOrCreate()
df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))
dataframes = [df01, df02, df03]
# Create a list of all the column names and sort them
cols = set()
for df in dataframes:
for x in df.columns:
cols.add(x)
cols = sorted(cols)
# Create a dictionary with all the dataframes
dfs = {}
for i, d in enumerate(dataframes):
new_name = 'df' + str(i) # New name for the key, the dataframe is the value
dfs[new_name] = d
# Loop through all column names. Add the missing columns to the dataframe (with value 0)
for x in cols:
if x not in d.columns:
dfs[new_name] = dfs[new_name].withColumn(x, lit(0))
dfs[new_name] = dfs[new_name].select(cols) # Use 'select' to get the columns sorted
# Now put it al together with a loop (union)
result = dfs['df0'] # Take the first dataframe, add the others to it
dfs_to_add = dfs.keys() # List of all the dataframes in the dictionary
dfs_to_add.remove('df0') # Remove the first one, because it is already in the result
for x in dfs_to_add:
result = result.union(dfs[x])
result.show()
Output:
+---+---+---+---+
| C1| C2| C3| C4|
+---+---+---+---+
| 1| 2| 3| 0|
| 9| 5| 6| 0|
| 0| 11| 12| 13|
| 0| 10| 15| 16|
|111| 0| 0|112|
|110| 0| 0|115|
+---+---+---+---+
这篇关于PySpark:具有不同列的 DataFrame 的动态联合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!