星火1.6数据框支点funtion [英] Spark 1.6 DataFrame pivot funtion

查看:334
本文介绍了星火1.6数据框支点funtion的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试了新的支点1.6在一个更大的数据集堆叠功能(<一个href=\"https://www.kaggle.com/worldbank/world-development-indicators/downloads/world-development-indicators-release-2016-01-28-06-31-53.zip\" rel=\"nofollow\">https://www.kaggle.com/worldbank/world-development-indicators/downloads/world-development-indicators-release-2016-01-28-06-31-53.zip )
它有5656458行和指示灯code列有1344不同的codeS。
当时的想法是使用支点拆散(以大熊猫而言)这组数据,并为每个指示灯code列。

I tried the new "pivot" function of 1.6 on a larger stacked dataset ( https://www.kaggle.com/worldbank/world-development-indicators/downloads/world-development-indicators-release-2016-01-28-06-31-53.zip ) It has 5,656,458 rows and the IndicatorCode column has 1344 different codes. The idea was to use pivot to "unstack" (in pandas terms) this data set and have a column for each IndicatorCode.

schema = StructType([ \
   StructField("CountryName", StringType(), True), \
   StructField("CountryCode", StringType(), True), \
   StructField("IndicatorName", StringType(), True), \
   StructField("IndicatorCode", StringType(), True), \
   StructField("Year", IntegerType(), True), \
   StructField("Value", DoubleType(), True)  \
])

data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv', 
                            format='com.databricks.spark.csv', 
                            header='true', 
                            schema=schema)

data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
                      .select(["CountryCode", "IndicatorCode2", "Year", "Value"])

columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]

data3 = data2.groupBy(["Year", "CountryCode"])\
             .pivot("IndicatorCode2", columns)\
             .max("Value")

虽然这成功返回, data3.first()从未返回的结果(我打断了使用后的3芯10分钟,我的独立)。

While this returned successfully, data3.first() never returned a result (I interrupted on my standalone using 3 cores after 10 min).

使用 RDD aggregateByKey 我的做法效果很好,所以我不寻找一个解决方案,有关如何做到这一点,但与枢纽是否DataFrames也可以做的伎俩。

My approach using RDD and aggregateByKey worked well, so I'm not searching for a solution about how to do it, but whether pivot with DataFrames can also do the trick.

推荐答案

好吧,旋转不是一般的一个非常有效的操作并没有太多可以用它做数据帧 API。有一件事你可以尝试,虽然是再分配数据:

Well, pivoting is not a very efficient operation in general and there is not much you can do about it using DataFrame API. One thing you can try though is to repartition your data:

(data2
  .repartition("Year", "CountryCode")
  .groupBy("Year", "CountryCode")
  .pivot("IndicatorCode2", columns)
  .max("Value"))

甚至总:

from pyspark.sql.functions import max

(df
    .groupBy("Year", "CountryCode", "IndicatorCode")
    .agg(max("Value").alias("Value"))
    .groupBy("Year", "CountryCode")
    .pivot("IndicatorCode", columns)
    .max("Value"))

在申请前透视。这两种解决方案背后的想法是一致的。相反,移动大量扩展移动狭窄密集的数据,并在本地扩大。

before applying pivot. The idea behind both solutions is the same. Instead of moving large expanded Rows move narrow dense data and expand locally.

这篇关于星火1.6数据框支点funtion的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆