Spark 2在分区上迭代以创建新分区 [英] Spark 2 iterating over a partition to create a new partition

查看:74
本文介绍了Spark 2在分区上迭代以创建新分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在想办法想出一种方法来将火花中的数据帧减少到一个记录数据帧中间隙的帧,最好不要完全消除并行性.这是一个大大简化的示例(这有点冗长,因为我希望它能够运行):

I have been scratching my head trying to come up with a way to reduce a dataframe in spark to a frame which records gaps in the dataframe, preferably without completely killing parallelism. Here is a much-simplified example (It's a bit lengthy because I wanted it to be able to run):

import org.apache.spark.sql.SparkSession

case class Record(typ: String, start: Int, end: Int);

object Sample {
    def main(argv: Array[String]): Unit = {
        val sparkSession = SparkSession.builder()
            .master("local")
            .getOrCreate();

        val df = sparkSession.createDataFrame(
            Seq(
                Record("One", 0, 5),
                Record("One", 10, 15),
                Record("One", 5, 8),
                Record("Two", 10, 25),
                Record("Two", 40, 45),
                Record("Three", 30, 35)
            )
        );

        df.repartition(df("typ")).sortWithinPartitions(df("start")).show();
    }
}

完成后,我希望能够输出这样的数据帧:

When I get done I would like to be able to output a dataframe like this:

typ   start    end
---   -----    ---
One   0        8
One   10       15
Two   10       25
Two   40       45
Three 30       35

我猜想按'typ'值进行分区将使我得到每个具有不同数据值1-1的分区,例如1-1.在样本中,我将最终分为三个部分,每个部分分别代表一个",第二"和三个".此外,sortWithinPartitions调用旨在在开始"时按排序顺序为我提供每个分区,以便我可以从头到尾进行迭代并记录间隔.最后一部分是我被困住的地方.这可能吗?如果没有,还有另一种方法吗?

I guessed that partitioning by the 'typ' value would give me partitions with each distinct data value, 1-1, E.G. in the sample I would end up with three partions, one each for 'One', 'Two' and 'Three'. Furthermore, the sortWithinPartitions call is intended to give me each partition in sorted order on 'start' so that I can iterate from the beginning to the end and record gaps. That last part is where I am stuck. Is this possible? If not, is there another approach that is?

推荐答案

我建议跳过重新分区和排序步骤,直接跳转到分布式压缩合并排序(我刚刚发明了算法的名称,就像算法本身一样.)

I propose to skip the repartitioning and the sorting steps, and jump directly to a distributed compressed merge sort (I've just invented the name for the algorithm, just like the algorithm itself).

这里是应该用作 reduce 操作的算法的一部分:

Here is the part of the algorithm that is supposed to be used as reduce operation:

  type Gap = (Int, Int)

  def mergeIntervals(as: List[Gap], bs: List[Gap]): List[Gap] = {
    require(!as.isEmpty, "as must be non-empty")
    require(!bs.isEmpty, "bs must be non-empty")

    @annotation.tailrec
    def mergeRec(
      gaps: List[Gap],
      gapStart: Int,
      gapEndAccum: Int,
      as: List[Gap],
      bs: List[Gap]
    ): List[Gap] = {
      as match {
        case Nil => {
          bs match {
            case Nil => (gapStart, gapEndAccum) :: gaps
            case notEmpty => mergeRec(gaps, gapStart, gapEndAccum, bs, Nil)
          }
        }
        case (a0, a1) :: at => {
          if (a0 <= gapEndAccum) {
            mergeRec(gaps, gapStart, gapEndAccum max a1, at, bs)
          } else {
            bs match {
              case Nil => mergeRec((gapStart, gapEndAccum) :: gaps, a0, gapEndAccum max a1, at, bs)
              case (b0, b1) :: bt => if (b0 <= gapEndAccum) {
                mergeRec(gaps, gapStart, gapEndAccum max b1, as, bt)
              } else {
                if (a0 < b0) {
                  mergeRec((gapStart, gapEndAccum) :: gaps, a0, a1, at, bs)
                } else {
                  mergeRec((gapStart, gapEndAccum) :: gaps, b0, b1, as, bt)
                }
              }
            }
          }
        }
      }
    }
    val (a0, a1) :: at = as
    val (b0, b1) :: bt = bs

    val reverseRes = 
      if (a0 < b0) 
        mergeRec(Nil, a0, a1, at, bs)
      else
        mergeRec(Nil, b0, b1, as, bt)

    reverseRes.reverse
  }

它的工作原理如下:

  println(mergeIntervals(
    List((0, 3), (4, 7), (9, 11), (15, 16), (18, 22)),
    List((1, 2), (4, 5), (6, 10), (12, 13), (15, 17))
  ))

  // Outputs:
  // List((0,3), (4,11), (12,13), (15,17), (18,22))

现在,如果将其与Spark的并行 reduce 组合,

Now, if you combine it with the parallel reduce of Spark,

  val mergedIntervals = df.
    as[(String, Int, Int)].
    rdd.
    map{case (t, s, e) => (t, List((s, e)))}.              // Convert start end to list with one interval
    reduceByKey(mergeIntervals).                           // perform parallel compressed merge-sort
    flatMap{ case (k, vs) => vs.map(v => (k, v._1, v._2))}.// explode resulting lists of merged intervals
    toDF("typ", "start", "end")                            // convert back to DF

  mergedIntervals.show()

您将获得类似于并行合并排序的功能,该功能可直接用于整数序列(因此称为名称)的压缩表示形式.

you obtain something like a parallel merge sort that works directly on compressed representations of integer sequences (thus the name).

结果:

+-----+-----+---+
|  typ|start|end|
+-----+-----+---+
|  Two|   10| 25|
|  Two|   40| 45|
|  One|    0|  8|
|  One|   10| 15|
|Three|   30| 35|
+-----+-----+---+


讨论

mergeIntervals 方法实现了可交换的关联操作,用于合并已经按升序排序的非重叠间隔列表.然后将所有重叠间隔合并,并再次以递增顺序存储.可以在 reduce 步骤中重复此过程,直到合并所有间隔序列.

The mergeIntervals method implements a commutative, associative operation for merging lists of non-overlapping intervals that are already sorted in increasing order. All the overlapping intervals are then merged, and again stored in increasing order. This procedure can be repeated in a reduce step until all interval sequences are merged.

该算法的有趣特性是,它最大程度地压缩了还原的每个中间结果.因此,如果您有很多时间间隔且重叠很多,那么该算法实际上可能比其他基于输入时间间隔排序的算法更快.

The interesting property of the algorithm is that it maximally compresses every intermediate result of reduction. Thus, if you have many intervals with a lot of overlap, this algorithm might actually be faster then other algorithms that are based on sorting of input intervals.

但是,如果您有很多间隔而很少重叠,则此方法可能会耗尽内存并且根本无法工作,因此必须使用其他算法来首先对间隔进行排序,然后进行某种扫描并在本地合并相邻间隔.因此,是否可行取决于用例.

However, if you have very many intervals with very seldom overlaps, then this method might run out of memory and not work at all, so that other algorithms must be used that first sort the intervals, and then make some kind of scan and merge adjacent intervals locally. So, whether this will work or not depends on the use-case.

完整代码

  val df = Seq(
    ("One", 0, 5),
    ("One", 10, 15),
    ("One", 5, 8),
    ("Two", 10, 25),
    ("Two", 40, 45),
    ("Three", 30, 35)
  ).toDF("typ", "start", "end")

  type Gap = (Int, Int)
  /** The `merge`-step of a variant of merge-sort
    * that works directly on compressed sequences of integers,
    * where instead of individual integers, the sequence is 
    * represented by sorted, non-overlapping ranges of integers.
    */
  def mergeIntervals(as: List[Gap], bs: List[Gap]): List[Gap] = {
    require(!as.isEmpty, "as must be non-empty")
    require(!bs.isEmpty, "bs must be non-empty")
    // assuming that `as` and `bs` both are either lists with a single
    // interval, or sorted lists that arise as output of
    // this method, recursively merges them into a single list of
    // gaps, merging all overlapping gaps.
    @annotation.tailrec
    def mergeRec(
      gaps: List[Gap],
      gapStart: Int,
      gapEndAccum: Int,
      as: List[Gap],
      bs: List[Gap]
    ): List[Gap] = {
      as match {
        case Nil => {
          bs match {
            case Nil => (gapStart, gapEndAccum) :: gaps
            case notEmpty => mergeRec(gaps, gapStart, gapEndAccum, bs, Nil)
          }
        }
        case (a0, a1) :: at => {
          if (a0 <= gapEndAccum) {
            mergeRec(gaps, gapStart, gapEndAccum max a1, at, bs)
          } else {
            bs match {
              case Nil => mergeRec((gapStart, gapEndAccum) :: gaps, a0, gapEndAccum max a1, at, bs)
              case (b0, b1) :: bt => if (b0 <= gapEndAccum) {
                mergeRec(gaps, gapStart, gapEndAccum max b1, as, bt)
              } else {
                if (a0 < b0) {
                  mergeRec((gapStart, gapEndAccum) :: gaps, a0, a1, at, bs)
                } else {
                  mergeRec((gapStart, gapEndAccum) :: gaps, b0, b1, as, bt)
                }
              }
            }
          }
        }
      }
    }
    val (a0, a1) :: at = as
    val (b0, b1) :: bt = bs

    val reverseRes = 
      if (a0 < b0) 
        mergeRec(Nil, a0, a1, at, bs)
      else
        mergeRec(Nil, b0, b1, as, bt)

    reverseRes.reverse
  }


  val mergedIntervals = df.
    as[(String, Int, Int)].
    rdd.
    map{case (t, s, e) => (t, List((s, e)))}.              // Convert start end to list with one interval
    reduceByKey(mergeIntervals).                           // perform parallel compressed merge-sort
    flatMap{ case (k, vs) => vs.map(v => (k, v._1, v._2))}.// explode resulting lists of merged intervals
    toDF("typ", "start", "end")                            // convert back to DF

  mergedIntervals.show()


测试

mergeIntervals 的实现已经过测试.如果您想将其实际合并到您的代码库中,至少这里是对其进行一次重复随机测试的草图:

The implementation of mergeIntervals is tested a little bit. If you want to actually incorporate it into your codebase, here is at least a sketch of one repeated randomized test for it:

  def randomIntervalSequence(): List[Gap] = {
    def recHelper(acc: List[Gap], open: Option[Int], currIdx: Int): List[Gap] = {
      if (math.random > 0.999) acc.reverse
      else {
        if (math.random > 0.90) {
          if (open.isEmpty) {
            recHelper(acc, Some(currIdx), currIdx + 1)
          } else {
            recHelper((open.get, currIdx) :: acc, None, currIdx + 1)
          }
        } else {
          recHelper(acc, open, currIdx + 1)
        }
      }
    }
    recHelper(Nil, None, 0)
  }

  def intervalsToInts(is: List[Gap]): List[Int] = is.flatMap{ case (a, b) => a to b }

  var numNonTrivialTests = 0
  while(numNonTrivialTests < 1000) {
    val as = randomIntervalSequence()
    val bs = randomIntervalSequence()
    if (!as.isEmpty && !bs.isEmpty) {
      numNonTrivialTests += 1
      val merged = mergeIntervals(as, bs)
      assert((intervalsToInts(as).toSet ++ intervalsToInts(bs)) == intervalsToInts(merged).toSet)
    }
  }

根据您的框架,您显然必须用更加文明的方式替换原始的 assert .

You would obviously have to replace the raw assert by something more civilized, depending on your framework.

这篇关于Spark 2在分区上迭代以创建新分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆