如何取消数据集堆叠(使用数据透视)? [英] How to unstack dataset (using pivot)?

查看:83
本文介绍了如何取消数据集堆叠(使用数据透视)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在

I tried the new "pivot" function of 1.6 on a larger stacked dataset. It has 5,656,458 rows and the IndicatorCode column has 1344 different codes.

这个想法是使用数据透视来拆栈"(以熊猫为单位)该数据集,并为每个IndicatorCode设置一列.

The idea was to use pivot to "unstack" (in pandas terms) this data set and have a column for each IndicatorCode.

schema = StructType([ \
   StructField("CountryName", StringType(), True), \
   StructField("CountryCode", StringType(), True), \
   StructField("IndicatorName", StringType(), True), \
   StructField("IndicatorCode", StringType(), True), \
   StructField("Year", IntegerType(), True), \
   StructField("Value", DoubleType(), True)  \
])

data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv', 
                            format='com.databricks.spark.csv', 
                            header='true', 
                            schema=schema)

data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
                      .select(["CountryCode", "IndicatorCode2", "Year", "Value"])

columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]

data3 = data2.groupBy(["Year", "CountryCode"])\
             .pivot("IndicatorCode2", columns)\
             .max("Value")

虽然此操作成功返回,但data3.first()从未返回结果(10分钟后,我使用3个内核在独立服务器上中断了操作.)

While this returned successfully, data3.first() never returned a result (I interrupted on my standalone using 3 cores after 10 min).

我使用RDDaggregateByKey的方法效果很好,所以我不是在寻找解决方案,而是使用DataFrames透视是否也可以解决问题.

My approach using RDD and aggregateByKey worked well, so I'm not searching for a solution about how to do it, but whether pivot with DataFrames can also do the trick.

推荐答案

好吧,通常来说,旋转并不是一种非常有效的操作,并且使用DataFrame API可以做的事情很少.您可以尝试做的一件事是repartition您的数据:

Well, pivoting is not a very efficient operation in general and there is not much you can do about it using DataFrame API. One thing you can try though is to repartition your data:

(data2
  .repartition("Year", "CountryCode")
  .groupBy("Year", "CountryCode")
  .pivot("IndicatorCode2", columns)
  .max("Value"))

甚至是合计:

from pyspark.sql.functions import max

(df
    .groupBy("Year", "CountryCode", "IndicatorCode")
    .agg(max("Value").alias("Value"))
    .groupBy("Year", "CountryCode")
    .pivot("IndicatorCode", columns)
    .max("Value"))

在应用pivot之前.两种解决方案背后的想法是相同的.而不是移动大的扩展Rows,而是移动狭窄的密集数据并在本地扩展.

before applying pivot. The idea behind both solutions is the same. Instead of moving large expanded Rows move narrow dense data and expand locally.

这篇关于如何取消数据集堆叠(使用数据透视)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆