如何定义DataFrame的分区? [英] How to define partitioning of DataFrame?

查看:28
本文介绍了如何定义DataFrame的分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经开始在 Spark 1.4.0 中使用 Spark SQL 和 DataFrames.我想在 Scala 中的 DataFrames 上定义自定义分区器,但不知道如何执行此操作.

I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this.

我正在使用的其中一个数据表包含一个交易列表,按帐户分类,类似于以下示例.

One of the data tables I'm working with contains a list of transactions, by account, silimar to the following example.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

至少在最初,大部分计算将在帐户内的交易之间进行.所以我希望对数据进行分区,以便一个帐户的所有交易都在同一个 Spark 分区中.

At least initially, most of the calculations will occur between the transactions within an account. So I would want to have the data partitioned so that all of the transactions for an account are in the same Spark partition.

但我没有看到定义这一点的方法.DataFrame 类有一个名为repartition(Int)"的方法,您可以在其中指定要创建的分区数.但是我没有看到任何可以为 DataFrame 定义自定义分区器的方法,例如可以为 RDD 指定的方法.

But I'm not seeing a way to define this. The DataFrame class has a method called 'repartition(Int)', where you can specify the number of partitions to create. But I'm not seeing any method available to define a custom partitioner for a DataFrame, such as can be specified for an RDD.

源数据存储在 Parquet 中.我确实看到在将 DataFrame 写入 Parquet 时,您可以指定要分区的列,所以大概我可以告诉 Parquet 按帐户"列对其数据进行分区.但是可能有数百万个帐户,如果我正确理解 Parquet,它会为每个帐户创建一个不同的目录,因此这听起来不是一个合理的解决方案.

The source data is stored in Parquet. I did see that when writing a DataFrame to Parquet, you can specify a column to partition by, so presumably I could tell Parquet to partition it's data by the 'Account' column. But there could be millions of accounts, and if I'm understanding Parquet correctly, it would create a distinct directory for each Account, so that didn't sound like a reasonable solution.

有没有办法让 Spark 对这个 DataFrame 进行分区,以便一个帐户的所有数据都在同一个分区中?

Is there a way to get Spark to partition this DataFrame so that all data for an Account is in the same partition?

推荐答案

Spark >= 2.3.0

SPARK-22614 公开范围分区.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389数据源 API v2.

在 Spark >= 1.6 中,可以使用按列分区进行查询和缓存.请参阅:SPARK-11410SPARK-4849 使用 repartition 方法:

In Spark >= 1.6 it is possible to use partitioning by column for query and caching. See: SPARK-11410 and SPARK-4849 using repartition method:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

不像 RDDs Spark Dataset(包括 Dataset[Row] aka DataFrame)不能使用自定义分区器现在.您通常可以通过创建人工分区列来解决这个问题,但它不会为您提供相同的灵活性.

Unlike RDDs Spark Dataset (including Dataset[Row] a.k.a DataFrame) cannot use custom partitioner as for now. You can typically address that by creating an artificial partitioning column but it won't give you the same flexibility.

您可以做的一件事是在创建 DataFrame

One thing you can do is to pre-partition input data before you create a DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

由于从 RDD 创建 DataFrame 只需要一个简单的地图阶段,因此应保留现有分区布局*:

Since DataFrame creation from an RDD requires only a simple map phase existing partition layout should be preserved*:

assert(df.rdd.partitions == partitioned.partitions)

同样的方式你可以重新分区现有的DataFrame:

The same way you can repartition existing DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

所以看起来也不是不可能.问题仍然是它是否有意义.我会争辩说,大多数时候它不会:

So it looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn't:

  1. 重新分区是一个昂贵的过程.在典型的场景中,大多数数据必须被序列化、混洗和反序列化.另一方面,可以从预分区数据中受益的操作数量相对较少,如果内部 API 没有设计为利用此属性,则会进一步受到限制.

  1. Repartitioning is an expensive process. In a typical scenario most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.

  • 在某些情况下加入,但需要内部支持,
  • 使用匹配的分区程序调用窗口函数.同上,仅限于单个窗口定义.不过它已经在内部进行了分区,因此预分区可能是多余的,
  • 使用 GROUP BY 进行简单聚合 - 可以减少临时缓冲区的内存占用**,但总体成本要高得多.或多或少相当于 groupByKey.mapValues(_.reduce)(当前行为)与 reduceByKey(预分区).在实践中不太可能有用.
  • 使用SqlContext.cacheTable 进行数据压缩.由于它看起来像是在使用运行长度编码,因此应用 OrderedRDDFunctions.repartitionAndSortWithinPartitions 可以提高压缩率.
  • joins in some scenarios, but it would require an internal support,
  • window functions calls with matching partitioner. Same as above, limited to a single window definition. It is already partitioned internally though, so pre-partitioning may be redundant,
  • simple aggregations with GROUP BY - it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent to groupByKey.mapValues(_.reduce) (current behavior) vs reduceByKey (pre-partitioning). Unlikely to be useful in practice.
  • data compression with SqlContext.cacheTable. Since it looks like it is using run length encoding, applying OrderedRDDFunctions.repartitionAndSortWithinPartitions could improve compression ratio.

性能高度依赖于密钥的分布.如果它有偏差,将导致资源利用率不理想.在最坏的情况下,根本不可能完成工作.

Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.

相关概念

使用 JDBC 源进行分区:

JDBC 数据源支持 谓词参数.可以如下使用:

JDBC data sources support predicates argument. It can be used as follows:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

它为每个谓词创建一个 JDBC 分区.请记住,如果使用单个谓词创建的集合不是不相交的,您将在结果表中看到重复项.

It creates a single JDBC partition per predicate. Keep in mind that if sets created using individual predicates are not disjoint you'll see duplicates in the resulting table.

partitionBy 方法:

Spark DataFrameWriter 提供了 partitionBy 方法,可用于在写入时分区"数据.它使用提供的一组列在写入时分离数据

Spark DataFrameWriter provides partitionBy method which can be used to "partition" data on write. It separates data on write using provided set of columns

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

这使得基于键的查询的谓词下推读取:

This enables predicate push down on read for queries based on key:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

但不等同于DataFrame.repartition.特别是像这样的聚合:

but it is not equivalent to DataFrame.repartition. In particular aggregations like:

val cnts = df1.groupBy($"k").sum()

仍需要 TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

DataFrameWriter 中的

bucketBy 方法(Spark >= 2.0):

bucketBy method in DataFrameWriter (Spark >= 2.0):

bucketBy 具有与 partitionBy 类似的应用程序,但它仅适用于表 (saveAsTable).分桶信息可用于优化连接:

bucketBy has similar applications as partitionBy but it is available only for tables (saveAsTable). Bucketing information can used to optimize joins:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

<小时>

* 分区布局 我指的只是数据分布.partitioned RDD 不再有分区器.** 假设没有提前预测.如果聚合仅涵盖列的一小部分,则可能没有任何收益.


* By partition layout I mean only a data distribution. partitioned RDD has no longer a partitioner. ** Assuming no early projection. If aggregation covers only small subset of columns there is probably no gain whatsoever.

这篇关于如何定义DataFrame的分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆