如何旋转 Spark DataFrame? [英] How to pivot Spark DataFrame?

查看:35
本文介绍了如何旋转 Spark DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我开始使用 Spark DataFrames,我需要能够旋转数据以从具有多行的 1 列中创建多列.Scalding 有内置的功能,我相信 Python 中的 Pandas,但我找不到新的 Spark Dataframe 的任何内容.

I am starting to use Spark DataFrames and I need to be able to pivot the data to create multiple columns out of 1 column with multiple rows. There is built in functionality for that in Scalding and I believe in Pandas in Python, but I can't find anything for the new Spark Dataframe.

我假设我可以编写某种自定义函数来执行此操作,但我什至不确定如何开始,尤其是因为我是 Spark 的新手.我任何人都知道如何使用内置功能或有关如何在 Scala 中编写某些内容的建议来做到这一点,非常感谢.

I assume I can write custom function of some sort that will do this but I'm not even sure how to start, especially since I am a novice with Spark. I anyone knows how to do this with built in functionality or suggestions for how to write something in Scala, it is greatly appreciated.

推荐答案

如前所述 by David Anderson Spark 从 1.6 版开始提供 pivot 功能.一般语法如下所示:

As mentioned by David Anderson Spark provides pivot function since version 1.6. General syntax looks as follows:

df
  .groupBy(grouping_columns)
  .pivot(pivot_column, [values]) 
  .agg(aggregate_expressions)

使用 nycflights13csv 的使用示例格式:

Usage examples using nycflights13 and csv format:

Python:

from pyspark.sql.functions import avg

flights = (sqlContext
    .read
    .format("csv")
    .options(inferSchema="true", header="true")
    .load("flights.csv")
    .na.drop())

flights.registerTempTable("flights")
sqlContext.cacheTable("flights")

gexprs = ("origin", "dest", "carrier")
aggexpr = avg("arr_delay")

flights.count()
## 336776

%timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
## 10 loops, best of 3: 1.03 s per loop

Scala:

val flights = sqlContext
  .read
  .format("csv")
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .load("flights.csv")

flights
  .groupBy($"origin", $"dest", $"carrier")
  .pivot("hour")
  .agg(avg($"arr_delay"))

Java:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;

Dataset<Row> df = spark.read().format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("flights.csv");

df.groupBy(col("origin"), col("dest"), col("carrier"))
        .pivot("hour")
        .agg(avg(col("arr_delay")));

R/SparkR:

library(magrittr)

flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

flights %>% 
  groupBy("origin", "dest", "carrier") %>% 
  pivot("hour") %>% 
  agg(avg(column("arr_delay")))

R/sparklyr

library(dplyr)

flights <- spark_read_csv(sc, "flights", "flights.csv")

avg.arr.delay <- function(gdf) {
   expr <- invoke_static(
      sc,
      "org.apache.spark.sql.functions",
      "avg",
      "arr_delay"
    )
    gdf %>% invoke("agg", expr, list())
}

flights %>% 
  sdf_pivot(origin + dest + carrier ~  hour, fun.aggregate=avg.arr.delay)

SQL:

请注意,Spark SQL 中的 PIVOT 关键字从 2.4 版本开始支持.

Note that PIVOT keyword in Spark SQL is supported starting from version 2.4.

CREATE TEMPORARY VIEW flights 
USING csv 
OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

 SELECT * FROM (
   SELECT origin, dest, carrier, arr_delay, hour FROM flights
 ) PIVOT (
   avg(arr_delay)
   FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
                13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
 );

示例数据:

"year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00

性能注意事项:

一般来说,旋转是一项昂贵的操作.

Generally speaking pivoting is an expensive operation.

  • 如果可以,请尝试提供 values 列表,因为这样可以避免计算唯一值的额外命中:

  • if you can, try to provide values list, as this avoids an extra hit to compute the uniques:

vs = list(range(25))
%timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
## 10 loops, best of 3: 392 ms per loop

  • 在某些情况下证明是有益的(可能不再值得在2.0 或更高版本) 以重新分区和/或预聚合数据

  • in some cases it proved to be beneficial (likely no longer worth the effort in 2.0 or later) to repartition and / or pre-aggregate the data

    仅用于重塑,您可以使用first:Pyspark Dataframe 上的透视字符串列

    for reshaping only, you can use first: Pivot String column on Pyspark Dataframe

    相关问题:

    这篇关于如何旋转 Spark DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆