PySpark:具有不同列的 DataFrame 的动态联合 [英] PySpark: dynamic union of DataFrames with different columns

查看:27
本文介绍了PySpark:具有不同列的 DataFrame 的动态联合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑此处显示的数组.我有 3 组数组:

数组 1:

C1 C2 C31 2 39 5 6

数组 2:

C2 C3 C411 12 1310 15 16

数组 3:

C1 C4111 112110 115

我需要如下输出,输入我可以获得 C1、...、C4 的任何一个值,但在加入时我需要获得正确的值,如果该值不存在,则它应该为零.

预期输出:

C1 C2 C3 C41 2 3 09 5 6 00 11 12 130 10 15 16111 0 0 112110 0 0 115

我已经编写了 pyspark 代码,但是我已经硬编码了新列及其 RAW 的值,我需要将以下代码转换为方法重载,以便我可以将此脚本用作自动脚本.我只需要使用 python/pyspark 而不是 Pandas.

导入pyspark从 pyspark 导入 SparkContext从 pyspark.sql.functions 导入点亮从 pyspark.sql 导入 SparkSessionsqlContext = pyspark.SQLContext(pyspark.SparkContext())df01 = sqlContext.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))df02 = sqlContext.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))df03 = sqlContext.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))df01_add = df01.withColumn("C4", lit(0)).select("c1","c2","c3","c4")df02_add = df02.withColumn("C1", lit(0)).select("c1","c2","c3","c4")df03_add = df03.withColumn("C2", lit(0)).withColumn("C3", lit(0)).select("c1","c2","c3","c4")df_uni = df01_add.union(df02_add).union(df03_add)df_uni.show()

方法重载示例:

班级学生:def ___Init__ (self,m1,m2):self.m1 = m1self.m2 = m2def sum(self,c1=None,c2=None,c3=None,c4=None):s = 0如果 c1!= None 和 c2 != None 和 c3 != None:s = c1+c2+c3elif c1 != None 和 c2 != None:s = c1+c2别的:s = c1返回打印(s1.sum(55,65,23))

解决方案

可能有很多更好的方法可以做到这一点,但也许下面的内容对未来的任何人都有用.

from pyspark.sql import SparkSession从 pyspark.sql.functions 导入点亮spark = SparkSession.builder.appName("动态框架").getOrCreate()df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))数据帧 = [df01, df02, df03]# 创建所有列名的列表并对其进行排序cols = 设置()对于数据帧中的 df:对于 df.columns 中的 x:cols.add(x)cols = 排序(列)# 创建一个包含所有数据框的字典dfs = {}对于 enumerate(dataframes) 中的 i, d:new_name = 'df' + str(i) # 键的新名称,数据帧是值dfs[new_name] = d# 遍历所有列名.将缺失的列添加到数据框中(值为 0)对于列中的 x:如果 x 不在 d.columns 中:dfs[new_name] = dfs[new_name].withColumn(x, lit(0))dfs[new_name] = dfs[new_name].select(cols) # 使用'select'对列进行排序# 现在用一个循环(联合)把它放在一起result = dfs['df0'] # 取第一个数据帧,添加其他数据帧dfs_to_add = dfs.keys() # 字典中所有数据框的列表dfs_to_add.remove('df0') # 删除第一个,因为它已经在结果中对于 dfs_to_add 中的 x:结果 = result.union(dfs[x])结果.show()

输出:

+---+---+---+---+|C1|C2|C3|C4|+---+---+---+---+|1|2|3|0||9|5|6|0||0|11|12|13||0|10|15|16||111|0|0|112||110|0|0|115|+---+---+---+---+

Consider the arrays as shown here. I have 3 sets of array:

Array 1:

C1  C2  C3
1   2   3
9   5   6

Array 2:

C2 C3 C4
11 12 13
10 15 16

Array 3:

C1   C4
111  112
110  115

I need the output as following, the input I can get any one value for C1, ..., C4 but while joining I need to get correct values and if the value is not there then it should be zero.

Expected output:

C1 C2 C3 C4
1  2  3  0
9  5  6  0
0  11 12 13
0 10 15 16
111 0 0 112
110 0 0 115

I have written pyspark code but I have hardcoded the value for the new column and its RAW, I need to convert the below code to method overloading, so that I can use this script as automatic one. I need to use only python/pyspark not pandas.

import pyspark
from pyspark import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession

sqlContext = pyspark.SQLContext(pyspark.SparkContext())

df01 = sqlContext.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
df02 = sqlContext.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
df03 = sqlContext.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))

df01_add = df01.withColumn("C4", lit(0)).select("c1","c2","c3","c4")
df02_add = df02.withColumn("C1", lit(0)).select("c1","c2","c3","c4")
df03_add = df03.withColumn("C2", lit(0)).withColumn("C3", lit(0)).select("c1","c2","c3","c4")

df_uni = df01_add.union(df02_add).union(df03_add)
df_uni.show()

Method Overloading Example:

class Student:
     def ___Init__ (self,m1,m2):
         self.m1 = m1
         self.m2 = m2

     def sum(self,c1=None,c2=None,c3=None,c4=None):
         s = 0
         if c1!= None and c2 != None and c3 != None:
            s = c1+c2+c3
         elif c1 != None and c2 != None:
             s = c1+c2
         else:
            s = c1
         return s

print(s1.sum(55,65,23))

解决方案

There are probably plenty of better ways to do it, but maybe the below is useful to anyone in the future.

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder
    .appName("DynamicFrame")
    .getOrCreate()

df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))

dataframes = [df01, df02, df03]

# Create a list of all the column names and sort them
cols = set()
for df in dataframes:
    for x in df.columns:
        cols.add(x)
cols = sorted(cols)

# Create a dictionary with all the dataframes
dfs = {}
for i, d in enumerate(dataframes):
    new_name = 'df' + str(i)  # New name for the key, the dataframe is the value
    dfs[new_name] = d
    # Loop through all column names. Add the missing columns to the dataframe (with value 0)
    for x in cols:
        if x not in d.columns:
            dfs[new_name] = dfs[new_name].withColumn(x, lit(0))
    dfs[new_name] = dfs[new_name].select(cols)  # Use 'select' to get the columns sorted

# Now put it al together with a loop (union)
result = dfs['df0']      # Take the first dataframe, add the others to it
dfs_to_add = dfs.keys()  # List of all the dataframes in the dictionary
dfs_to_add.remove('df0') # Remove the first one, because it is already in the result
for x in dfs_to_add:
    result = result.union(dfs[x])
result.show()

Output:

+---+---+---+---+
| C1| C2| C3| C4|
+---+---+---+---+
|  1|  2|  3|  0|
|  9|  5|  6|  0|
|  0| 11| 12| 13|
|  0| 10| 15| 16|
|111|  0|  0|112|
|110|  0|  0|115|
+---+---+---+---+

这篇关于PySpark:具有不同列的 DataFrame 的动态联合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆