加入庞大的数据帧列表会导致堆栈溢出错误 [英] Joining huge list of data frames causes stack-overflow error

查看:109
本文介绍了加入庞大的数据帧列表会导致堆栈溢出错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个函数,该函数使用一些公共列连接数据帧列表.下面是代码:

I have written a function that joins a list of data frames using some common column.Below is the code :

def joinByColumn(dfs: List[DataFrame], column: String): DataFrame = {
    //check that all dfs contain the required column
    require(dfs.map(_.columns).forall(_.contains(column)))

    dfs.reduce((df1, df2) => df1.join(df2, Seq(column), "full_outer"))
  }

我已经为此功能编写了一个测试案例,该案例适用于columnNum的较小值(假设为4),但是当我使用较大的值(如200)时,则会引发堆栈溢出错误.

I have written a test case for this function which works for small value of columnNum (lets say 4),but when I use a larger value like 200 then it throws stack overflow error.

test("complicated") {
    val base = sqlContext.createDataFrame(
      Seq(
        (1, 1)
      )
    ).toDF("key", "a")

    val columnNum = 200

    val dfs = (1 to columnNum)
      .map(i => base.toDF("key", s"a$i"))
      .toList

    val actual = Ex4.joinByColumn(dfs, "key")
    actual.explain()
    val row = Row.fromSeq(Seq.fill(columnNum + 1)(1))
    val rdd = sc.parallelize(Seq(row))

    val columns = "key" :: (1 to columnNum).map(i => s"a$i").toList
    val schema = StructType(columns.map(c => StructField(c, IntegerType)))

    val expected = sqlContext.createDataFrame(rdd, schema)

    expected should beEqualTo(actual)

  }

PFB堆栈跟踪:

java.lang.StackOverflowError was thrown.
java.lang.StackOverflowError
    at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer._findMissing(PropertyValueBuffer.java:134)
    at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer.getParameters(PropertyValueBuffer.java:118)
    at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:136)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:442)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:296)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
    at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:86)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:137)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1755)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$2.apply(RDD.scala:1768)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1768)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)
    at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1756)

...........

...........

有人可以帮助我找到根本原因吗? 在本示例中,我们如何解决此问题?有没有更好的方法来加入庞大的数据帧列表?

Can someone help me in finding root cause for this? How can we solve this problem in this example and is there any better approach to join huge list of data frames?

推荐答案

我能够使用local-checkpointing解决此问题:

I was able to resolve this issue using local-checkpointing :

def joinByColumn(dfs: List[DataFrame], column: String): DataFrame = {
    //check that all dfs contain the required column
    require(dfs.map(_.columns).forall(_.contains(column)))

    dfs.reduce((df1, df2) =>

      df1.join(df2, Seq(column), "full_outer").localCheckpoint(true)


    )
  }

这篇关于加入庞大的数据帧列表会导致堆栈溢出错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆