在Spark中处理大量列时出现StackOverflowError [英] StackOverflowError when operating with a large number of columns in Spark

查看:579
本文介绍了在Spark中处理大量列时出现StackOverflowError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个宽数据框(130000行x 8700列),当我尝试对所有列求和时,出现以下错误:

I have a wide dataframe (130000 rows x 8700 columns) and when I try to sum all columns I´m getting the following error:

线程"main"中的异常java.lang.StackOverflowError 在scala.collection.generic.Growable $$ anonfun $$ plus $ plus $ eq $ 1.apply(Growable.scala:59) 在scala.collection.generic.Growable $$ anonfun $$ plus $ plus $ eq $ 1.apply(Growable.scala:59) 在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33) 在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 在scala.collection.generic.Growable $ class.$ plus $ plus $ eq(Growable.scala:59) 在scala.collection.mutable.ListBuffer.$ plus $ plus $ eq(ListBuffer.scala:183) 在scala.collection.mutable.ListBuffer.$ plus $ plus $ eq(ListBuffer.scala:45) 在scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) 在org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) 在org.apache.spark.sql.catalyst.trees.TreeNode.containsChild $ lzycompute(TreeNode.scala:88) ...

Exception in thread "main" java.lang.StackOverflowError at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88) ...

这是我的Scala代码:

This is my Scala code:

  val df = spark.read
    .option("header", "false")
    .option("delimiter", "\t")
    .option("inferSchema", "true")
    .csv("D:\\Documents\\Trabajo\\Fábregas\\matrizLuna\\matrizRelativa")


  val arrayList = df.drop("cups").columns
  var colsList = List[Column]()
  arrayList.foreach { c => colsList :+= col(c) }

  val df_suma = df.withColumn("consumo_total", colsList.reduce(_ + _))

如果我对几列进行相同的操作,效果很好,但是当我尝试对大量列进行reduce操作时,总是会遇到相同的错误.

If I do the same with a few columns it works fine but I´m always getting the same error when i try the reduce operation with a high number of columns.

有人可以建议我该怎么做吗?列数有什么限制吗?

Can anyone suggest how can I do it? is there any limitation on the number of columns?

谢谢!

推荐答案

您可以使用另一种还原方法,该方法生成深度为O(log(n))的平衡二叉树,而不是深度为O(n)的简并线性化BinaryExpression链:

You can use a different reduction method that produces a balanced binary tree of depth O(log(n)) instead of a degenerate linearized BinaryExpression chain of depth O(n):

def balancedReduce[X](list: List[X])(op: (X, X) => X): X = list match {
  case Nil => throw new IllegalArgumentException("Cannot reduce empty list")
  case List(x) => x
  case xs => {
    val n = xs.size
    val (as, bs) = list.splitAt(n / 2)
    op(balancedReduce(as)(op), balancedReduce(bs)(op))
  }
}

现在在您的代码中,您可以替换

Now in your code, you can replace

colsList.reduce(_ + _)

作者

balancedReduce(colsList)(_ + _)

一个小例子进一步说明BinaryExpression会发生什么,可以编译而没有任何依赖性:

A little example to further illustrate what happens with the BinaryExpressions, compilable without any dependencies:

sealed trait FormalExpr
case class BinOp(left: FormalExpr, right: FormalExpr) extends FormalExpr {
  override def toString: String = {
    val lStr = left.toString.split("\n").map("  " + _).mkString("\n")
    val rStr = right.toString.split("\n").map("  " + _).mkString("\n")
    return s"BinOp(\n${lStr}\n${rStr}\n)"
  }
}
case object Leaf extends FormalExpr

val leafs = List.fill[FormalExpr](16){Leaf}

println(leafs.reduce(BinOp(_, _)))
println(balancedReduce(leafs)(BinOp(_, _)))

这就是普通的reduce所做的(这实际上就是代码中发生的事情):

This is what the ordinary reduce does (and this is what essentially happens in your code):

BinOp(
  BinOp(
    BinOp(
      BinOp(
        BinOp(
          BinOp(
            BinOp(
              BinOp(
                BinOp(
                  BinOp(
                    BinOp(
                      BinOp(
                        BinOp(
                          BinOp(
                            BinOp(
                              Leaf
                              Leaf
                            )
                            Leaf
                          )
                          Leaf
                        )
                        Leaf
                      )
                      Leaf
                    )
                    Leaf
                  )
                  Leaf
                )
                Leaf
              )
              Leaf
            )
            Leaf
          )
          Leaf
        )
        Leaf
      )
      Leaf
    )
    Leaf
  )
  Leaf
)

这是balancedReduce产生的:

BinOp(
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
  BinOp(
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
    BinOp(
      BinOp(
        Leaf
        Leaf
      )
      BinOp(
        Leaf
        Leaf
      )
    )
  )
)

线性化链的长度为O(n),并且当Catalyst尝试对其进行评估时,它将使堆栈崩溃.对于深度为O(log(n))的扁平树,不应发生这种情况.

The linearized chain is of length O(n), and when Catalyst is trying to evaluate it, it blows the stack. This should not happen with the flat tree of depth O(log(n)).

在我们讨论渐近运行时时:为什么要追加到可变的colsList上?这需要O(n^2)时间.为什么不直接在.columns的输出上调用toList?

And while we are talking about asymptotic runtimes: why are you appending to a mutable colsList? This needs O(n^2) time. Why not simply call toList on the output of .columns?

这篇关于在Spark中处理大量列时出现StackOverflowError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆