在 Spark 中操作大量列时出现 StackOverflowError [英] StackOverflowError when operating with a large number of columns in Spark

查看:59
本文介绍了在 Spark 中操作大量列时出现 StackOverflowError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个宽数据框(130000 行 x 8700 列),当我尝试对所有列求和时,出现以下错误:

I have a wide dataframe (130000 rows x 8700 columns) and when I try to sum all columns I´m getting the following error:

线程main"中的异常java.lang.StackOverflowError在 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)在 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)在 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)在 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)在 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)在 scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)在 scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)在 scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49)在 org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400)在 org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88)...

Exception in thread "main" java.lang.StackOverflowError at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88) ...

这是我的 Scala 代码:

This is my Scala code:

  val df = spark.read
    .option("header", "false")
    .option("delimiter", "\t")
    .option("inferSchema", "true")
    .csv("D:\\Documents\\Trabajo\\Fábregas\\matrizLuna\\matrizRelativa")


  val arrayList = df.drop("cups").columns
  var colsList = List[Column]()
  arrayList.foreach { c => colsList :+= col(c) }

  val df_suma = df.withColumn("consumo_total", colsList.reduce(_ + _))

如果我对几列做同样的事情,它工作正常,但是当我尝试对大量列执行 reduce 操作时,我总是遇到同样的错误.

If I do the same with a few columns it works fine but I´m always getting the same error when i try the reduce operation with a high number of columns.

谁能建议我该怎么做?列数有限制吗?

Can anyone suggest how can I do it? is there any limitation on the number of columns?

谢谢!

推荐答案

您可以使用不同的归约方法,生成深度 O(log(n)) 的平衡二叉树,而不是退化的线性化BinaryExpression深度链O(n):

You can use a different reduction method that produces a balanced binary tree of depth O(log(n)) instead of a degenerate linearized BinaryExpression chain of depth O(n):

def balancedReduce[X](list: List[X])(op: (X, X) => X): X = list match {
  case Nil => throw new IllegalArgumentException("Cannot reduce empty list")
  case List(x) => x
  case xs => {
    val n = xs.size
    val (as, bs) = list.splitAt(n / 2)
    op(balancedReduce(as)(op), balancedReduce(bs)(op))
  }
}

现在在您的代码中,您可以替换

Now in your code, you can replace

colsList.reduce(_ + _)

balancedReduce(colsList)(_ + _)

一个小例子来进一步说明 BinaryExpression 会发生什么,无需任何依赖即可编译:

A little example to further illustrate what happens with the BinaryExpressions, compilable without any dependencies:

sealed trait FormalExpr
case class BinOp(left: FormalExpr, right: FormalExpr) extends FormalExpr {
  override def toString: String = {
    val lStr = left.toString.split("\n").map("  " + _).mkString("\n")
    val rStr = right.toString.split("\n").map("  " + _).mkString("\n")
    return s"BinOp(\n${lStr}\n${rStr}\n)"
  }
}
case object Leaf extends FormalExpr

val leafs = List.fill[FormalExpr](16){Leaf}

println(leafs.reduce(BinOp(_, _)))
println(balancedReduce(leafs)(BinOp(_, _)))

这就是普通的 reduce 所做的(这就是您的代码中发生的事情):

This is what the ordinary reduce does (and this is what essentially happens in your code):

BinOp(
  BinOp(
    BinOp(
      BinOp(
        BinOp(
          BinOp(
            BinOp(
              BinOp(
                BinOp(
                  BinOp(
                    BinOp(
                      BinOp(
                        BinOp(
                          BinOp(
                            BinOp(
                              Leaf
                              Leaf
                            )
                            Leaf
                          )
                          Leaf
                        )
                        Leaf
                      )
                      Leaf
                    )
                    Leaf
                  )
                  Leaf
                )
                Leaf
              )
              Leaf
            )
            Leaf
          )
          Leaf
        )
        Leaf
      )
      Leaf
    )
    Leaf
  )
  Leaf
)

这就是 balancedReduce 产生的:

BinOp(
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
)

线性化链的长度为O(n),当 Catalyst 试图对其进行评估时,它会破坏堆栈.这不应该发生在深度O(log(n))的扁平树上.

The linearized chain is of length O(n), and when Catalyst is trying to evaluate it, it blows the stack. This should not happen with the flat tree of depth O(log(n)).

当我们谈论渐近运行时:为什么要附加到可变的 colsList 上?这需要 O(n^2) 时间.为什么不简单地对 .columns 的输出调用 toList ?

And while we are talking about asymptotic runtimes: why are you appending to a mutable colsList? This needs O(n^2) time. Why not simply call toList on the output of .columns?

这篇关于在 Spark 中操作大量列时出现 StackOverflowError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆