Pyspark 多连接列 <>行值:减少操作 [英] Pyspark Multiple JOINS Column <> Row values: Reducing Actions

查看:17
本文介绍了Pyspark 多连接列 <>行值:减少操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含 3 列的主表表 1"(如下所示).表 2.1、3.1 &4.1 适用于表 1 中的 3 个唯一日期,需要填写在点 1"列中.类似地,表 2.2、3.2 &4.2 适用于表 1 中相同的 3 个唯一日期,需要填充到点 2"列中.

I have a master table 'Table 1' with 3 columns(Shown below). Tables 2.1, 3.1 & 4.1 are for 3 unique dates present in Table 1 and need to be populated in column 'Points 1'. Similarly, Tables 2.2, 3.2 & 4.2 are for same 3 unique dates present in Table 1 and need to be populated in column 'Points 2'.

目前的方法:

df1 = spark.table("Table1")
df2_1 = spark.table("table2.1")
df2_1 = withColumn("Date", lit(3312019))


df3 = df1.join(df2_1, df1.ID==df2.1==ID & df1.Date==df2_1.Date, 'left')
df4 = df3.withColumn('Points', when(df3.Category==A, col('A'))
                              .when(df3.Category==B, col('B'))
                              .when(df3.Category==C, col('C'))
                              .when(df3.Category==D, col('D'))
                              .otherwise(lit(None)))

如果对所有 6 个表实施当前方法会使我的代码变得冗长,有什么建议可以缩短它并减少多个操作吗?

Current Approach makes my code lengthy if implemented for all 6 tables, any suggestions to shorten it and reduce multiple actions?

推荐答案

我不知道这是更短还是更干净"比你的版本,但既然你在这方面寻求帮助,我会把它作为答案发布.请注意,我的答案是常规 spark (scala) - 不是 pyspark,但如果您发现答案有用,将其移植到 pyspark 应该不会太难:)

I don't know if this is much shorter or "cleaner" than your version, but since you asked for help on this, I will post this as an answer. Please note that my answer is in regular spark (scala) - not pyspark, but it shouldn't be too difficult to port it to pyspark, if you find the answer useful :)

这里是:

先来一个小辅助函数

def columns2rows(row: Row) = {
  val id = row.getInt(0)
  val date = row.getInt(1)
  val cols = Seq("A", "B", "C", "D")
  cols.indices.map(index => (id, cols(index), date, if (row.isNullAt(index+2)) 0 else row.getInt(index+2)))
}

然后将填充Points1"所需的表合并在一起

Then union together the tables needed to populate "Points1"

val df1 = table21.withColumn("Date", lit(3312019))
  .unionByName(table31.withColumn("Date", lit(12312019)))
  .unionByName(table41.withColumn("Date", lit(5302020)))
  .select($"ID", $"Date", $"A", $"B", $"C", $"D")
  .flatMap(row => columns2rows(row))
  .toDF("ID", "Category", "Date", "Points1")

然后将填充Points2"所需的表合并在一起

Then union together the tables needed to populate "Points2"

val df2 = table22.withColumn("Date", lit(3312019))
  .unionByName(table32.withColumn("Date", lit(12312019)))
  .unionByName(table42.withColumn("Date", lit(5302020)))
  .select($"ID", $"Date", $"A", $"B", $"C", $"D")
  .flatMap(row => columns2rows(row))
  .toDF("ID", "Category", "Date", "Points2")

将它们连接在一起,最后与原始表连接:

Join them together and finally with the original table:

val joiningTable = df1.join(df2, Seq("ID", "Category", "Date"))

val res = table1.join(joiningTable, Seq("ID", "Category", "Date"))

...瞧——打印最终结果:

...and voila - printing the final result:

res.show()

+---+--------+--------+-------+-------+
| ID|Category|    Date|Points1|Points2|
+---+--------+--------+-------+-------+
|123|       A| 3312019|     40|     20|
|123|       B| 5302020|     10|     90|
|123|       D| 5302020|      0|     80|
|123|       A|12312019|     20|     10|
|123|       B|12312019|      0|     10|
|123|       B| 3312019|     60|     60|
+---+--------+--------+-------+-------+

这篇关于Pyspark 多连接列 <>行值:减少操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆