默认情况下,Spark Dataframe 是如何分区的? [英] How is a Spark Dataframe partitioned by default?

查看:36
本文介绍了默认情况下,Spark Dataframe 是如何分区的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道使用 HashPartitioner 根据键值对 RDD 进行分区.但是 Spark Dataframe 默认是如何分区的,因为它没有 key/value 的概念.

I know that an RDD is partitioned based on the key values using the HashPartitioner. But how is a Spark Dataframe partitioned by default as it does not have the concept of key/value.

推荐答案

Dataframe 的分区取决于运行以创建它的任务数量.

A Dataframe is partitioned dependent on the number of tasks that run to create it.

没有默认"应用了分区逻辑.以下是如何设置分区的一些示例:

There is no "default" partitioning logic applied. Here are some examples how partitions are set:

  • 通过 val df = Seq(1 to 500000: _*).toDF() 创建的数据帧将只有一个分区.
  • 通过 val df = spark.range(0,100).toDF() 创建的 Dataframe 具有与可用内核数量一样多的分区(例如,当您的 master 设置为 local 时为 4[4]).另外,请参阅下面关于默认并行度"的备注.这对没有父 RDD 的 parallelize 等操作生效.
  • 从 RDD (spark.createDataFrame(rdd, schema)) 派生的 Dataframe 将具有与底层 RDD 相同数量的分区.就我而言,由于我在本地有 6 个内核,因此创建的 RDD 有 6 个分区.
  • 从 Kafka 主题消费的数据帧将具有与主题的分区匹配的分区数量,因为它可以使用与主题有分区一样多的内核/插槽来消费主题.
  • 通过读取文件创建的数据框,例如除非必须根据 spark.sql.files.maxPartitionBytes(默认为 128MB)将单个文件拆分为多个分区,否则来自 HDFS 的分区数量将与文件匹配.
  • 从需要 shuffle 的转换派生的 Dataframe 将具有由 spark.sql.shuffle.partitions 设置的可配置分区数量(默认为 200).
  • ...
  • A Dataframe created through val df = Seq(1 to 500000: _*).toDF() will have only a single partition.
  • A Dataframe created through val df = spark.range(0,100).toDF() has as many partitions as the number of available cores (e.g. 4 when your master is set to local[4]). Also, see remark below on the "default parallelism" that comes into effect for operations like parallelize with no parent RDD.
  • A Dataframe derived from an RDD (spark.createDataFrame(rdd, schema)) will have the same amount of partitions as the underlying RDD. In my case, as I have locally 6 cores, the RDD got created with 6 partitions.
  • A Dataframe consuming from a Kafka topic will have the amount of partitions matching with the partitions of the topic because it can use as many cores/slots as the topic has partitions to consume the topic.
  • A Dataframe created by reading a file e.g. from HDFS will have the amount of partitions matching them of the file unless individual files have to be splitted into multiple partitions based on spark.sql.files.maxPartitionBytes which defaults to 128MB.
  • A Dataframe derived from a transformation requiring a shuffle will have the configurable amount of partitions set by spark.sql.shuffle.partitions (200 by default).
  • ...

RDD 和 Structured API 之间的主要区别之一是,您对分区的控制不如 RDD 多,您甚至可以定义自定义分区器.这对于 Dataframes 是不可能的.

One of the major disctinctions between RDD and Structured API is that you do not have as much control over the partitions as you have with RDDs where you can even define a custom partitioner. This is not possible with Dataframes.

执行行为配置文档spark.default.parallelism 解释:

对于没有父级 RDD 的并行化操作,这取决于集群管理器:

For operations like parallelize with no parent RDDs, it depends on the cluster manager:

本地模式:本地机器上的内核数

Mesos 细粒度模式:8

其他:所有执行器节点上的内核总数或 2 个,以较大者为准

Others: total number of cores on all executor nodes or 2, whichever is larger

这篇关于默认情况下,Spark Dataframe 是如何分区的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆