如何定义DataFrame的分区? [英] How to define partitioning of DataFrame?

查看:564
本文介绍了如何定义DataFrame的分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames。我想在Scala中的DataFrames上定义一个自定义分区器,但没有看到如何执行此操作。



我正在使用的数据表之一包含

 帐户日期类型金额
1001 2014-04- 01购买100.00
1001 2014-04-01购买50.00
1001 2014-04-05购买70.00
1001 2014-04-01付款-150.00
1002 2014-04-01购买80.00
1002 2014-04-02购买22.00
1002 2014-04-04付款-120.00
1002 2014-04-04购买60.00
1003 2014-04-02购买210.00
1003 2014-04-03购买15.00

至少最初,大部分计算将发生在帐户内的交易之间。所以我想要将数据分区,以便一个帐户的所有交易都在同一个Spark分区。



但是我没有看到一个方法来定义这个。 DataFrame类有一个名为repartition(Int)的方法,您可以在其中指定要创建的分区数。但是我没有看到可以为DataFrame定义一个自定义分区的方法,例如可以为RDD指定。



源数据存储在Parquet中。当我将DataFrame写入Parquet时,我可以指定一个分区的列,所以我可以告诉Parquet将其数据按Account列进行分区。但是可以有数以百万计的帐户,如果我正确地理解Parquet,它会为每个帐户创建一个不同的目录,这听起来不是一个合理的解决方案。


解决方案

有没有办法让Spark分割这个DataFrame,以便帐号的所有数据都在同一个分区? h2> Spark> = 1.6

在Spark> = 1.6中,可以使用列分区进行查询和缓存。请参阅: SPARK-11410 repartition 方法:

$ b,使用apache/ jira / browse / SPARK-4849rel =nofollow noreferrer> SPARK-4849
$ b

  val df = Seq(
(A,1),(B,2),(A,3) ,(C,1)
).toDF(k,v)

val partition = df.repartition($k)
分区。解释

// scala> df.repartition($k)。explain(true)
// ==解析逻辑计划==
//'RepartitionByExpression ['k],无
// + - 项目[_1#5 AS k#7,_2#6 AS v#8]
// + - LogicalRDD [_1#5,_2#6],MapPartitionsRDD [3] at rddToDataFrameHolder at< console>:27
//
// ==分析逻辑计划==
// k:string,v:int
// RepartitionByExpression [k#7],无
/ / + - 项目[_1#5 AS k#7,_2#6 AS v#8]
// + - LogicalRDD [_1#5,_2#6],MapPartitionsRDD [3] at rddToDataFrameHolder at< console> ; $ 27
//
// ==优化逻辑方案==
// RepartitionByExpression [k#7],无
// + - 项目[_1#5 AS k#7,_2#6 AS v#8]
// + - LogicalRDD [_1#5,_2#6],MapPartitionsRDD [3] at rddToDataFrameHolder at< console>:27
//
// ==物理方案==
// TungstenExchange hashpartitioning(k#7,200),无
// + - 项目[_1#5 AS k#7,_2#6 AS v #8]
// + - 扫描PhysicalRDD [_1#5,_2#6]

RDD不同s Spark 数据集(包括 Dataset [Row] aka DataFrame )现在无法使用自定义分区。通常可以通过创建人工分区列来解决这个问题,但是它不会给您一个同样的灵活性。



Spark< 1.6.0



您可以做的一件事是在创建一个 DataFrame

  import org.apache.spark.sql.types._ 
import org.apache.spark。 sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
StructField(x,StringType,false),
StructField (y,LongType,false),
StructField(z,DoubleType,false)
))

val rdd = sc.parallelize(Seq(
Row(foo,1L,0.5),Row(bar,0L,0.0),Row(??,-1L,2.0),
Row(foo,-1L, ),Row(??,3L,0.6),Row(bar,-3L,0.99)
))

val partitioner = new HashPartitioner(5)

val partitions = rdd.map(r =>(r.getString(0),r))
.partitionBy(partitioner)
.values

val df = sqlContext.createDataFrame(partition,schema)

由于 DataFrame 创建从 RDD 只需要一个简单的地图应该保留阶段现有的分区布局*:

  assert(df.rdd.partitions == partitions.partitions)

与现有 DataFrame 相同的方式:

  sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1),r))。partitionBy(partitioner).values,
df.schema

所以看起来这不是不可能的。这个问题依然是有道理的。我会认为,大部分时间它不是:


  1. 重新分区是一个昂贵的过程。在典型的情况下,大部分数据必须被序列化,洗牌和反序列化。另一方面,可以受益于预分区数据的操作数量相对较小,并且如果内部API不被设计为利用该属性则进一步受限制。




    • 在某些情况下加入,但需要内部支持

    • 窗口函数调用匹配分割器。与上述相同,仅限于单个窗口定义。它已经在内部进行了分区,因此预分区可能是多余的,

    • 简单聚合与 GROUP BY - 可以减少内存占用的临时缓冲区**,但总体成本要高得多。或多或少相当于 groupByKey.mapValues(_。reduce)(当前行为)vs reduceByKey (预分区) 。不太可能在实践中有用。

    • 数据压缩与 SqlContext.cacheTable 。由于它看起来像使用运行长度编码,因此应用 OrderedRDDFunctions.repartitionAndSortWithinPartitions 可以提高压缩比。


  2. 性能高度依赖于键的分布。如果它是偏斜的,则会导致次优资源的利用。在最糟糕的情况下,根本不可能完成这项工作。


  3. 使用高级声明式API的一个重点是将自己与低级别的实现细节隔离开来。如 @dwysakowicz @ RomiKuntsman ,优化是 Catalyst Optimizer 。这是一个非常复杂的野兽,我真的怀疑,你可以很容易地改进,没有潜水更深入其内部。

使用JDBC资源分区



JDBC数据源支持 谓词参数。它可以使用如下:

  sqlContext.read.jdbc(url,table,Array(foo = 1, foo = 3),道具)

它每个谓词创建一个单独的JDBC分区。请记住,如果使用单个谓词创建的集合不是不相交的,您将在结果表中看到重复的内容。



partitionBy c $ c> DataFrameWriter 中的code>方法

Spark DataFrameWriter 提供 partitionBy 方法,该方法可用于在写入时分配数据。它使用提供的列分离写入数据

  val df = Seq(
(foo,1.0) ,(bar,2.0),(foo,1.5),(bar,2.6)
).toDF(k,v)

df。 write.partitionBy(k)。json(/ tmp / foo.json)

启用基于键的谓词下推查询查询:

  val df1 = sqlContext.read.schema(df.schema)。 json(/ tmp / foo.json)
df1.where($k===bar)

,但不等于 DataFrame.repartition 。特别是聚合如:

  val cnts = df1.groupBy($k)sum()

仍然需要 TungstenExchange

  cnts.explain 

// ==物理计划==
// TungstenAggregate(key = [k#90] ,函数= [(sum(v#91),mode = Final,isDistinct = false)],output = [k#90,sum(v)#93])
// + - TungstenExchange hashPartitioning(k# 90.00),无
// + - TungstenAggregate(key = [k#90],functions = [(sum(v#91),mode = Partial,isDistinct = false)],output = [k# sum#99])
// + - 扫描JSONRelation [k#90,v#91] InputPaths:file:/tmp/foo.json
DataFrameWriter
中的

bucketBy (Spark> = 2.0):



bucketBy 具有与 partitionBy ,但它仅适用于表( saveAsTable )。截至今天(Spark 2.1.0),它看起来并不像在分层表中应用任何执行计划优化。






*由分区布局我只是一个数据分发。 partition RDD不再是分区器。
**假设没有早期投影。如果聚合仅涵盖了小列数,则可能没有任何收益。


I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this.

One of the data tables I'm working with contains a list of transactions, by account, silimar to the following example.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

At least initially, most of the calculations will occur between the transactions within an account. So I would want to have the data partitioned so that all of the transactions for an account are in the same Spark partition.

But I'm not seeing a way to define this. The DataFrame class has a method called 'repartition(Int)', where you can specify the number of partitions to create. But I'm not seeing any method available to define a custom partitioner for a DataFrame, such as can be specified for an RDD.

The source data is stored in Parquet. I did see that when writing a DataFrame to Parquet, you can specify a column to partition by, so presumably I could tell Parquet to partition it's data by the 'Account' column. But there could be millions of accounts, and if I'm understanding Parquet correctly, it would create a distinct directory for each Account, so that didn't sound like a reasonable solution.

Is there a way to get Spark to partition this DataFrame so that all data for an Account is in the same partition?

解决方案

Spark >= 1.6

In Spark >= 1.6 it is possible to use partitioning by column for query and caching. See: SPARK-11410 and SPARK-4849 using repartition method:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Unlike RDDs Spark Dataset (including Dataset[Row] a.k.a DataFrame) cannot use custom partitioner as for now. You can typically address that by creating an artificial partitioning column but it won't give you the same flexibility.

Spark < 1.6.0:

One thing you can do is to pre-partition input data before you create a DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Since DataFrame creation from an RDD requires only a simple map phase existing partition layout should be preserved*:

assert(df.rdd.partitions == partitioned.partitions)

The same way you can repartition existing DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

So it looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn't:

  1. Repartitioning is an expensive process. In a typical scenario most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.

    • joins in some scenarios, but it would require an internal support,
    • window functions calls with matching partitioner. Same as above, limited to a single window definition. It is already partitioned internally though, so pre-partitioning may be redundant,
    • simple aggregations with GROUP BY - it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent to groupByKey.mapValues(_.reduce) (current behavior) vs reduceByKey (pre-partitioning). Unlikely to be useful in practice.
    • data compression with SqlContext.cacheTable. Since it looks like it is using run length encoding, applying OrderedRDDFunctions.repartitionAndSortWithinPartitions could improve compression ratio.
  2. Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.

  3. A whole point of using a high level declarative API is to isolate yourself from a low level implementation details. As already mentioned by @dwysakowicz and @RomiKuntsman an optimization is a job of the Catalyst Optimizer. It is a pretty sophisticated beast and I really doubt you can easily improve on that without diving much deeper into its internals.

Partitioning with JDBC sources:

JDBC data sources support predicates argument. It can be used as follows:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

It creates a single JDBC partition per predicate. Keep in mind that if sets created using individual predicates are not disjoint you'll see duplicates in the resulting table.

partitionBy method in DataFrameWriter:

Spark DataFrameWriter provides partitionBy method which can be used to "partition" data on write. It separates data on write using provided set of columns

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

This enables predicate push down on read for queries based on key:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

but it is not equivalent to DataFrame.repartition. In particular aggregations like:

val cnts = df1.groupBy($"k").sum()

will still require TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy method in DataFrameWriter (Spark >= 2.0):

bucketBy has similar applications as partitionBy but it is available only for tables (saveAsTable). As of today (Spark 2.1.0) it doesn't look like there are any execution plan optimizations applied on bucketed tables.


* By partition layout I mean only a data distribution. partitioned RDD has no longer a partitioner. ** Assuming no early projection. If aggregation covers only small subset of columns there is probably no gain whatsoever.

这篇关于如何定义DataFrame的分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆