Pyspark 多连接列 <>行值:减少操作 [英] Pyspark Multiple JOINS Column <> Row values: Reducing Actions
问题描述
我有一个包含 3 列的主表表 1"(如下所示).表 2.1、3.1 &4.1 适用于表 1 中的 3 个唯一日期,需要填写在点 1"列中.类似地,表 2.2、3.2 &4.2 适用于表 1 中相同的 3 个唯一日期,需要填充到点 2"列中.
I have a master table 'Table 1' with 3 columns(Shown below). Tables 2.1, 3.1 & 4.1 are for 3 unique dates present in Table 1 and need to be populated in column 'Points 1'. Similarly, Tables 2.2, 3.2 & 4.2 are for same 3 unique dates present in Table 1 and need to be populated in column 'Points 2'.
目前的方法:
df1 = spark.table("Table1")
df2_1 = spark.table("table2.1")
df2_1 = withColumn("Date", lit(3312019))
df3 = df1.join(df2_1, df1.ID==df2.1==ID & df1.Date==df2_1.Date, 'left')
df4 = df3.withColumn('Points', when(df3.Category==A, col('A'))
.when(df3.Category==B, col('B'))
.when(df3.Category==C, col('C'))
.when(df3.Category==D, col('D'))
.otherwise(lit(None)))
如果对所有 6 个表实施当前方法会使我的代码变得冗长,有什么建议可以缩短它并减少多个操作吗?
Current Approach makes my code lengthy if implemented for all 6 tables, any suggestions to shorten it and reduce multiple actions?
推荐答案
我不知道这是更短还是更干净"比你的版本,但既然你在这方面寻求帮助,我会把它作为答案发布.请注意,我的答案是常规 spark (scala) - 不是 pyspark,但如果您发现答案有用,将其移植到 pyspark 应该不会太难:)
I don't know if this is much shorter or "cleaner" than your version, but since you asked for help on this, I will post this as an answer. Please note that my answer is in regular spark (scala) - not pyspark, but it shouldn't be too difficult to port it to pyspark, if you find the answer useful :)
这里是:
先来一个小辅助函数
def columns2rows(row: Row) = {
val id = row.getInt(0)
val date = row.getInt(1)
val cols = Seq("A", "B", "C", "D")
cols.indices.map(index => (id, cols(index), date, if (row.isNullAt(index+2)) 0 else row.getInt(index+2)))
}
然后将填充Points1"所需的表合并在一起
Then union together the tables needed to populate "Points1"
val df1 = table21.withColumn("Date", lit(3312019))
.unionByName(table31.withColumn("Date", lit(12312019)))
.unionByName(table41.withColumn("Date", lit(5302020)))
.select($"ID", $"Date", $"A", $"B", $"C", $"D")
.flatMap(row => columns2rows(row))
.toDF("ID", "Category", "Date", "Points1")
然后将填充Points2"所需的表合并在一起
Then union together the tables needed to populate "Points2"
val df2 = table22.withColumn("Date", lit(3312019))
.unionByName(table32.withColumn("Date", lit(12312019)))
.unionByName(table42.withColumn("Date", lit(5302020)))
.select($"ID", $"Date", $"A", $"B", $"C", $"D")
.flatMap(row => columns2rows(row))
.toDF("ID", "Category", "Date", "Points2")
将它们连接在一起,最后与原始表连接:
Join them together and finally with the original table:
val joiningTable = df1.join(df2, Seq("ID", "Category", "Date"))
val res = table1.join(joiningTable, Seq("ID", "Category", "Date"))
...瞧——打印最终结果:
...and voila - printing the final result:
res.show()
+---+--------+--------+-------+-------+
| ID|Category| Date|Points1|Points2|
+---+--------+--------+-------+-------+
|123| A| 3312019| 40| 20|
|123| B| 5302020| 10| 90|
|123| D| 5302020| 0| 80|
|123| A|12312019| 20| 10|
|123| B|12312019| 0| 10|
|123| B| 3312019| 60| 60|
+---+--------+--------+-------+-------+
这篇关于Pyspark 多连接列 <>行值:减少操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!