Pyspark多个JOINS列<>行值:减少动作 [英] Pyspark Multiple JOINS Column <> Row values: Reducing Actions

查看:51
本文介绍了Pyspark多个JOINS列<>行值:减少动作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含3列的主表表1"(如下所示).表2.1、3.1和表4.1是表1中存在的3个唯一日期,需要在点1"列中进行填充.类似地,表2.2、3.2和表2也是如此.对于表1中存在的3个唯一日期,4.2是相同的,需要在点2"列中填充.

I have a master table 'Table 1' with 3 columns(Shown below). Tables 2.1, 3.1 & 4.1 are for 3 unique dates present in Table 1 and need to be populated in column 'Points 1'. Similarly, Tables 2.2, 3.2 & 4.2 are for same 3 unique dates present in Table 1 and need to be populated in column 'Points 2'.

当前方法:

df1 = spark.table("Table1")
df2_1 = spark.table("table2.1")
df2_1 = withColumn("Date", lit(3312019))


df3 = df1.join(df2_1, df1.ID==df2.1==ID & df1.Date==df2_1.Date, 'left')
df4 = df3.withColumn('Points', when(df3.Category==A, col('A'))
                              .when(df3.Category==B, col('B'))
                              .when(df3.Category==C, col('C'))
                              .when(df3.Category==D, col('D'))
                              .otherwise(lit(None)))

如果对所有6个表都实施,当前方法会使我的代码冗长,是否有任何建议可以缩短代码并减少多项操作?

Current Approach makes my code lengthy if implemented for all 6 tables, any suggestions to shorten it and reduce multiple actions?

推荐答案

我不知道这是更短还是更干净"?而不是您的版本,但是由于您需要此方面的帮助,因此我将其作为答案.请注意,我的答案是定期出现的(scala)-不是pyspark,但是如果发现有用的答案,将其移植到pyspark应该不会太困难:

I don't know if this is much shorter or "cleaner" than your version, but since you asked for help on this, I will post this as an answer. Please note that my answer is in regular spark (scala) - not pyspark, but it shouldn't be too difficult to port it to pyspark, if you find the answer useful :)

这就是:

首先是一些辅助功能

def columns2rows(row: Row) = {
  val id = row.getInt(0)
  val date = row.getInt(1)
  val cols = Seq("A", "B", "C", "D")
  cols.indices.map(index => (id, cols(index), date, if (row.isNullAt(index+2)) 0 else row.getInt(index+2)))
}

然后将填充"Points1"所需的表合并在一起

Then union together the tables needed to populate "Points1"

val df1 = table21.withColumn("Date", lit(3312019))
  .unionByName(table31.withColumn("Date", lit(12312019)))
  .unionByName(table41.withColumn("Date", lit(5302020)))
  .select($"ID", $"Date", $"A", $"B", $"C", $"D")
  .flatMap(row => columns2rows(row))
  .toDF("ID", "Category", "Date", "Points1")

然后将填充"Points2"所需的表合并在一起

Then union together the tables needed to populate "Points2"

val df2 = table22.withColumn("Date", lit(3312019))
  .unionByName(table32.withColumn("Date", lit(12312019)))
  .unionByName(table42.withColumn("Date", lit(5302020)))
  .select($"ID", $"Date", $"A", $"B", $"C", $"D")
  .flatMap(row => columns2rows(row))
  .toDF("ID", "Category", "Date", "Points2")

将它们结合在一起,最后与原始表结合起来:

Join them together and finally with the original table:

val joiningTable = df1.join(df2, Seq("ID", "Category", "Date"))

val res = table1.join(joiningTable, Seq("ID", "Category", "Date"))

...然后瞧-打印最终结果:

...and voila - printing the final result:

res.show()

+---+--------+--------+-------+-------+
| ID|Category|    Date|Points1|Points2|
+---+--------+--------+-------+-------+
|123|       A| 3312019|     40|     20|
|123|       B| 5302020|     10|     90|
|123|       D| 5302020|      0|     80|
|123|       A|12312019|     20|     10|
|123|       B|12312019|      0|     10|
|123|       B| 3312019|     60|     60|
+---+--------+--------+-------+-------+

这篇关于Pyspark多个JOINS列<>行值:减少动作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆