什么是按列分区但保持固定分区数的有效方法? [英] What is an efficient way to partition by column but maintain a fixed partition count?

查看:54
本文介绍了什么是按列分区但保持固定分区数的有效方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

按字段将数据划分为预定义的分区数的最佳方法是什么?

What is the best way to partition the data by a field into predefined partition count?

我当前正在通过指定partionCount = 600来对数据进行分区.发现计数600可以为我的数据集/集群设置提供最佳的查询性能.

I am currently partitioning the data by specifying the partionCount=600. The count 600 is found to give best query performance for my dataset/cluster setup.

val rawJson = sqlContext.read.json(filename).coalesce(600)
rawJson.write.parquet(filenameParquet)

现在,我想按"eventName"列对数据进行分区,但仍保持计数600.该数据当前具有约2000个唯一的eventName,加上每个eventName中的行数不是统一的.大约10个eventName具有超过50%的数据,导致数据偏斜.因此,如果我像下面那样进行分区,则性能不是很好.写入所花费的时间比没有写入所花费的时间多5倍.

Now I want to partition this data by the column 'eventName' but still keep the count 600. The data currently has around 2000 unique eventNames, plus the number of rows in each eventName is not uniform. Around 10 eventNames have more than 50% of the data causing data skew. Hence if I do the partitioning like below, its not very performant. The write is taking 5x more time than without.

val rawJson = sqlContext.read.json(filename)
rawJson.write.partitionBy("eventName").parquet(filenameParquet)

在这些情况下,对数据进行分区的好方法是什么?有没有一种方法可以按eventName进行分区,但可以将其分散到600个分区中?

What is a good way to partition the data for these scenarios? Is there a way to partition by eventName but spread this into 600 partitions?

我的模式如下:

{  
  "eventName": "name1",
  "time": "2016-06-20T11:57:19.4941368-04:00",
  "data": {
    "type": "EventData",
    "dataDetails": {
      "name": "detailed1",
      "id": "1234",
...
...
    }
  }
} 

谢谢!

推荐答案

这是数据偏斜的常见问题,您可以采取几种方法.

This is a common problem with skewed data and there are several approaches you can take.

如果时滞在一段时间内保持稳定,则列表存储有效,这种情况可能出现,也可能不是,尤其是引入了分区变量的新值时.我还没有研究过随时间调整列表存储的难易程度,并且正如您的评论所言,您不能使用它,因为它是Spark 2.0的功能.

List bucketing works if the skew remains stable over time, which may or may not be the case, especially if new values of the partitioning variable are introduced. I have not researched how easy it is to adjust list bucketing over time and, as your comment states, you can't use that anyway because it is a Spark 2.0 feature.

如果使用1.6.x,则主要观察到的是可以创建自己的函数,该函数将每个事件名称映射到600个唯一值中的一个.您可以将其作为UDF或大小写表达式来执行.然后,您只需使用该函数创建一列,然后使用repartition(600, 'myPartitionCol)而不是coalesce(600)对该列进行分区.

If you are on 1.6.x, the key observation is that you can create your own function that maps each event name into one of 600 unique values. You can do this as a UDF or as a case expression. Then, you simply create a column using that function and then partition by that column using repartition(600, 'myPartitionCol) as opposed to coalesce(600).

因为我们在 Swoop 中处理了非常偏斜的数据,所以我发现以下主力数据结构对于构建与分区相关的工具.

Because we deal with very skewed data at Swoop, I've found the following workhorse data structure to be quite useful for building partitioning-related tools.

/** Given a key, returns a random number in the range [x, y) where
  * x and y are the numbers in the tuple associated with a key.
  */
class RandomRangeMap[A](private val m: Map[A, (Int, Int)]) extends Serializable {
  private val r = new java.util.Random() // Scala Random is not serializable in 2.10

  def apply(key: A): Int = {
    val (start, end) = m(key)
    start + r.nextInt(end - start)
  }

  override def toString = s"RandomRangeMap($r, $m)"
}

例如,这是我们为稍有不同的情况构建分区器的方法:一种数据偏斜且键数较小的情况,因此我们必须增加偏斜键的分区数,同时保持1为每个键的最小分区数:

For example, here is how we build a partitioner for a slightly different case: one where the data is skewed and the number of keys is small so we have to increase the number of partitions for the skewed keys while sticking with 1 as the minimum number of partitions per key:

/** Partitions data such that each unique key ends in P(key) partitions.
  * Must be instantiated with a sequence of unique keys and their Ps.
  * Partition sizes can be highly-skewed by the data, which is where the
  * multiples come in.
  *
  * @param keyMap  maps key values to their partition multiples
  */
class ByKeyPartitionerWithMultiples(val keyMap: Map[Any, Int]) extends Partitioner {
  private val rrm = new RandomRangeMap(
    keyMap.keys
      .zip(
        keyMap.values
          .scanLeft(0)(_+_)
          .zip(keyMap.values)
          .map {
            case (start, count) => (start, start + count)
          }
      )
      .toMap
  )

  override val numPartitions =
    keyMap.values.sum

  override def getPartition(key: Any): Int =
    rrm(key)
}

object ByKeyPartitionerWithMultiples {

  /** Builds a UDF with a ByKeyPartitionerWithMultiples in a closure.
    *
    * @param keyMap  maps key values to their partition multiples
    */
  def udf(keyMap: Map[String, Int]) = {
    val partitioner = new ByKeyPartitionerWithMultiples(keyMap.asInstanceOf[Map[Any, Int]])
    (key:String) => partitioner.getPartition(key)
  }

}

在您的情况下,您必须将多个事件名称合并到一个分区中,这需要进行更改,但是我希望上面的代码能为您提供解决问题的方法.

In your case, you have to merge several event names into a single partition, which would require changes but I hope the code above gives you an idea how to approach the problem.

最后一个观察结果是,如果事件名称的分布随时间推移在数据中的价值很大,则可以对部分数据执行统计收集,以计算映射表.您不必总是在需要时一直执行此操作.要确定这一点,您可以查看每个分区中的输出文件的行数和/或大小.换句话说,整个过程可以作为Spark作业的一部分自动化.

One final observation is that if the distribution of event names values a lot in your data over time, you can perform a statistics gathering pass over some part of the data to compute a mapping table. You don't have to do this all the time, just when it is needed. To determine that, you can look at the number of rows and/or size of output files in each partition. In other words, the entire process can be automated as part of your Spark jobs.

这篇关于什么是按列分区但保持固定分区数的有效方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆