在Spark中处理大量列时出现StackOverflowError [英] StackOverflowError when operating with a large number of columns in Spark
问题描述
我有一个宽数据框(130000行x 8700列),当我尝试对所有列求和时,出现以下错误:
I have a wide dataframe (130000 rows x 8700 columns) and when I try to sum all columns I´m getting the following error:
线程"main"中的异常java.lang.StackOverflowError 在scala.collection.generic.Growable $$ anonfun $$ plus $ plus $ eq $ 1.apply(Growable.scala:59) 在scala.collection.generic.Growable $$ anonfun $$ plus $ plus $ eq $ 1.apply(Growable.scala:59) 在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33) 在scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 在scala.collection.generic.Growable $ class.$ plus $ plus $ eq(Growable.scala:59) 在scala.collection.mutable.ListBuffer.$ plus $ plus $ eq(ListBuffer.scala:183) 在scala.collection.mutable.ListBuffer.$ plus $ plus $ eq(ListBuffer.scala:45) 在scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) 在org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) 在org.apache.spark.sql.catalyst.trees.TreeNode.containsChild $ lzycompute(TreeNode.scala:88) ...
Exception in thread "main" java.lang.StackOverflowError at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183) at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:49) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:400) at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:88) ...
这是我的Scala代码:
This is my Scala code:
val df = spark.read
.option("header", "false")
.option("delimiter", "\t")
.option("inferSchema", "true")
.csv("D:\\Documents\\Trabajo\\Fábregas\\matrizLuna\\matrizRelativa")
val arrayList = df.drop("cups").columns
var colsList = List[Column]()
arrayList.foreach { c => colsList :+= col(c) }
val df_suma = df.withColumn("consumo_total", colsList.reduce(_ + _))
如果我对几列进行相同的操作,效果很好,但是当我尝试对大量列进行reduce操作时,总是会遇到相同的错误.
If I do the same with a few columns it works fine but I´m always getting the same error when i try the reduce operation with a high number of columns.
有人可以建议我该怎么做吗?列数有什么限制吗?
Can anyone suggest how can I do it? is there any limitation on the number of columns?
谢谢!
推荐答案
您可以使用另一种还原方法,该方法生成深度为O(log(n))
的平衡二叉树,而不是深度为O(n)
的简并线性化BinaryExpression
链:
You can use a different reduction method that produces a balanced binary tree of depth O(log(n))
instead of a degenerate linearized BinaryExpression
chain of depth O(n)
:
def balancedReduce[X](list: List[X])(op: (X, X) => X): X = list match {
case Nil => throw new IllegalArgumentException("Cannot reduce empty list")
case List(x) => x
case xs => {
val n = xs.size
val (as, bs) = list.splitAt(n / 2)
op(balancedReduce(as)(op), balancedReduce(bs)(op))
}
}
现在在您的代码中,您可以替换
Now in your code, you can replace
colsList.reduce(_ + _)
作者
balancedReduce(colsList)(_ + _)
一个小例子进一步说明BinaryExpression
会发生什么,可以编译而没有任何依赖性:
A little example to further illustrate what happens with the BinaryExpression
s, compilable without any dependencies:
sealed trait FormalExpr
case class BinOp(left: FormalExpr, right: FormalExpr) extends FormalExpr {
override def toString: String = {
val lStr = left.toString.split("\n").map(" " + _).mkString("\n")
val rStr = right.toString.split("\n").map(" " + _).mkString("\n")
return s"BinOp(\n${lStr}\n${rStr}\n)"
}
}
case object Leaf extends FormalExpr
val leafs = List.fill[FormalExpr](16){Leaf}
println(leafs.reduce(BinOp(_, _)))
println(balancedReduce(leafs)(BinOp(_, _)))
这就是普通的reduce
所做的(这实际上就是代码中发生的事情):
This is what the ordinary reduce
does (and this is what essentially happens in your code):
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
Leaf
)
这是balancedReduce
产生的:
BinOp(
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
)
BinOp(
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
BinOp(
BinOp(
Leaf
Leaf
)
BinOp(
Leaf
Leaf
)
)
)
)
线性化链的长度为O(n)
,并且当Catalyst尝试对其进行评估时,它将使堆栈崩溃.对于深度为O(log(n))
的扁平树,不应发生这种情况.
The linearized chain is of length O(n)
, and when Catalyst is trying to evaluate it, it blows the stack. This should not happen with the flat tree of depth O(log(n))
.
在我们讨论渐近运行时时:为什么要追加到可变的colsList
上?这需要O(n^2)
时间.为什么不直接在.columns
的输出上调用toList
?
And while we are talking about asymptotic runtimes: why are you appending to a mutable colsList
? This needs O(n^2)
time. Why not simply call toList
on the output of .columns
?
这篇关于在Spark中处理大量列时出现StackOverflowError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!