没有聚合的火花枢轴 [英] spark pivot without aggregation
问题描述
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html 很好地解释了数据透视表如何为 spark 工作.
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html explain nicely how a pivot works for spark.
在我的python代码中,我使用没有聚合的pandas但重置了索引和连接:
In my python code, I use pandas without an aggregation but reset the index and join:
pd.pivot_table(data=dfCountries, index=['A'], columns=['B'])
countryToMerge.index.name = 'ISO'
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')
这在 Spark 中是如何工作的?
How does this work in spark?
我尝试手动分组和加入,例如:
I tried to group and join manually like:
val grouped = countryKPI.groupBy("A").pivot("B")
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show
但这不起作用.reset_index 将如何融入 spark/如何以 spark 原生方式实现?
but that does not work. How would the reset_index fit into spark / How would it be implemented in a spark native way?
python 代码的最小示例:
a minimal example of the python code:
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"])
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO'])
dates['ISO'] = isos.ISO
dates['ISO'] = dates['ISO'].astype("category")
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'],
'indicator_id':['a','a','b','b'],
'value':[7,8,9,7]})
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id'])
countryToMerge.index.name = 'ISO'
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner'))
dates ISO a b
0 2016-01-01 ABC 7 9
1 2016-01-03 ABC 7 9
2 2016-01-05 ABC 7 9
3 2016-01-07 ABC 7 9
4 2016-01-09 ABC 7 9
5 2016-01-02 POL 8 7
6 2016-01-04 POL 8 7
7 2016-01-06 POL 8 7
8 2016-01-08 POL 8 7
9 2016-01-10 POL 8 7
在 scala/spark 中跟进
to follow along in scala / spark
val dates = Seq(("2016-01-01", "ABC"),
("2016-01-02", "ABC"),
("2016-01-03", "POL"),
("2016-01-04", "ABC"),
("2016-01-05", "POL"),
("2016-01-06", "ABC"),
("2016-01-07", "POL"),
("2016-01-08", "ABC"),
("2016-01-09", "POL"),
("2016-01-10", "ABC")
).toDF("dates", "ISO")
.withColumn("dates", 'dates.cast("Date"))
dates.show
dates.printSchema
val countryKPI = Seq(("ABC", "a", 7),
("ABC", "b", 8),
("POL", "a", 9),
("POL", "b", 7)
).toDF("country_id3", "indicator_id", "value")
countryKPI.show
countryKPI.printSchema
val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id")
推荐答案
以下代码段似乎有效 - 但我不确定 avg 的聚合是否正确 - 即使拟合数字"是输出.
The following snippet seems to work - but I am not sure if an aggregation by avg is correct -even though "fitting numbers" are the output.
countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show
与仅重用这些值(因为我不想聚合)相比,我不确定这对于大量数据 (avg) 是否低效".
I'm not sure if this is "inefficient" for a bigger amount of data (avg) compared to just reusing the values (as I do not want to aggregate).
这篇关于没有聚合的火花枢轴的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!