没有聚合的火花枢轴 [英] spark pivot without aggregation

查看:13
本文介绍了没有聚合的火花枢轴的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html 很好地解释了数据透视表如何为 spark 工作.

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html explain nicely how a pivot works for spark.

在我的python代码中,我使用没有聚合的pandas但重置了索引和连接:

In my python code, I use pandas without an aggregation but reset the index and join:

pd.pivot_table(data=dfCountries, index=['A'], columns=['B'])
countryToMerge.index.name = 'ISO'
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')

这在 Spark 中是如何工作的?

How does this work in spark?

我尝试手动分组和加入,例如:

I tried to group and join manually like:

val grouped = countryKPI.groupBy("A").pivot("B")
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show

但这不起作用.reset_index 将如何融入 spark/如何以 spark 原生方式实现?

but that does not work. How would the reset_index fit into spark / How would it be implemented in a spark native way?

python 代码的最小示例:

a minimal example of the python code:

import pandas as pd
from datetime import datetime, timedelta
import numpy as np
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"])
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO'])
dates['ISO'] = isos.ISO
dates['ISO'] = dates['ISO'].astype("category")
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'],
                       'indicator_id':['a','a','b','b'],
                       'value':[7,8,9,7]})
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id'])
countryToMerge.index.name = 'ISO'
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner'))

  dates  ISO  a  b
0  2016-01-01  ABC  7  9
1  2016-01-03  ABC  7  9
2  2016-01-05  ABC  7  9
3  2016-01-07  ABC  7  9
4  2016-01-09  ABC  7  9
5  2016-01-02  POL  8  7
6  2016-01-04  POL  8  7
7  2016-01-06  POL  8  7
8  2016-01-08  POL  8  7
9  2016-01-10  POL  8  7

在 scala/spark 中跟进

to follow along in scala / spark

val dates = Seq(("2016-01-01", "ABC"),
    ("2016-01-02", "ABC"),
    ("2016-01-03", "POL"),
    ("2016-01-04", "ABC"),
    ("2016-01-05", "POL"),
    ("2016-01-06", "ABC"),
    ("2016-01-07", "POL"),
    ("2016-01-08", "ABC"),
    ("2016-01-09", "POL"),
    ("2016-01-10", "ABC")
  ).toDF("dates", "ISO")
    .withColumn("dates", 'dates.cast("Date"))

  dates.show
  dates.printSchema

  val countryKPI = Seq(("ABC", "a", 7),
    ("ABC", "b", 8),
    ("POL", "a", 9),
    ("POL", "b", 7)
  ).toDF("country_id3", "indicator_id", "value")

  countryKPI.show
  countryKPI.printSchema

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id")

推荐答案

以下代码段似乎有效 - 但我不确定 avg 的聚合是否正确 - 即使拟合数字"是输出.

The following snippet seems to work - but I am not sure if an aggregation by avg is correct -even though "fitting numbers" are the output.

countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show

与仅重用这些值(因为我不想聚合)相比,我不确定这对于大量数据 (avg) 是否低效".

I'm not sure if this is "inefficient" for a bigger amount of data (avg) compared to just reusing the values (as I do not want to aggregate).

这篇关于没有聚合的火花枢轴的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆