引发火花:java.lang.StackOverflowError窗口函数? [英] Spark Caused by: java.lang.StackOverflowError Window Function?

查看:45
本文介绍了引发火花:java.lang.StackOverflowError窗口函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我认为是由Window Function引起的错误.

Running into an error I think being caused by the Window Function.

当我应用此脚本并仅保留几个示例行时,它就可以正常工作,但是当我将其应用于我的整个数据集时(只有几个GB) 在尝试持久保存到hdfs时,它在最后一步失败并显示此异常错误...当我不使用Window Function时,脚本起作用了,所以问题必定是这样的(我大约有325个功能列正在运行for循环).

When I apply this script and persist just a few sample rows it works fine however when I apply it to my whole dataset (only a few GB) it fails with this bizarre error on the last step when trying to persist to hdfs ... the script works when I persist w/o the Window Function so the problem must be from that (I have around 325 feature columns running through the for loop).

任何想法可能是导致问题的原因吗?我的目标是通过正向填充方法将时间序列数据插入数据框中的每个变量.

Any idea what could be causing the problem? My goal is to just impute time series data via forward fill method on every variable in my dataframe.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
import sys
print(spark.version)
'2.3.0'

# sample data
df = spark.createDataFrame([('2019-05-10 7:30:05', '1', '10', '0.5', 'FALSE'),\
                            ('2019-05-10 7:30:10', '2', 'UNKNOWN', '0.24', 'FALSE'),\
                            ('2019-05-10 7:30:15', '3', '6', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:20', '4', '7', 'UNKNOWN', 'UNKNOWN'),\
                            ('2019-05-10 7:30:25', '5', '10', '1.1', 'UNKNOWN'),\
                            ('2019-05-10 7:30:30', '6', 'UNKNOWN', '1.1', 'NULL'),\
                            ('2019-05-10 7:30:35', '7', 'UNKNOWN', 'UNKNOWN', 'TRUE'),\
                            ('2019-05-10 7:30:49', '8', '50', 'UNKNOWN', 'UNKNOWN')], ["date", "id", "v1", "v2", "v3"])

df = df.withColumn("date", F.col("date").cast("timestamp"))

# imputer process / all cols that need filled are strings
def stringReplacer(x, y):
    return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
  for i in cols:
    window = Window\
    .partitionBy(F.month(partitioner))\
    .orderBy(partitioner)\
    .rowsBetween(-sys.maxsize, 0)

    df = df\
    .withColumn(i, stringReplacer(F.col(i), value))
    fill = F.last(df[i], ignorenulls=True).over(window)
    df = df.withColumn(i,  fill)
  return df
df2 = forwardFillImputer(df, cols=[i for i in df.columns])

# errors here
df2\
.write\
.format("csv")\
.mode("overwrite")\
.option("header", "true")\
.save("test_window_func.csv")

Py4JJavaError: An error occurred while calling o13504.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
    at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:381)

可能的解决方案

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
    window = Window \
     .partitionBy(F.month(partitioner)) \
     .orderBy(partitioner) \
     .rowsBetween(-sys.maxsize, 0)
    imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                    for i in cols]
    missing_cols = [i for i in df.columns if i not in cols]
    return df.select(missing_cols+imputed_cols)

df2 = forwardFillImputer(df, cols=[i for i in df.columns[1:]])

df2.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- v3: string (nullable = true)

df2.show()
+-------------------+---+---+----+-----+
|               date| id| v1|  v2|   v3|
+-------------------+---+---+----+-----+
|2019-05-10 07:30:05|  1| 10| 0.5|FALSE|
|2019-05-10 07:30:10|  2| 10|0.24|FALSE|
|2019-05-10 07:30:15|  3|  6|0.24| TRUE|
|2019-05-10 07:30:20|  4|  7|0.24| TRUE|
|2019-05-10 07:30:25|  5| 10| 1.1| TRUE|
|2019-05-10 07:30:30|  6| 10| 1.1| NULL|
|2019-05-10 07:30:35|  7| 10| 1.1| TRUE|
|2019-05-10 07:30:49|  8| 50| 1.1| TRUE|
+-------------------+---+---+----+-----+

推荐答案

通过stacktrace,只要我相信错误来自执行计划的准备,就可以了:

By the stacktrace provided I believe the error comes from preparation of the execution plan, as it says:

Caused by: java.lang.StackOverflowError
    at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200)

我相信这样做的原因是因为您在循环中两次调用了方法.withColumn. Spark执行计划中的.withColumn基本上是对所有列的select语句,其中按方法中的指定更改了1列.如果您有325列,则对于单次迭代,这将对325列调用select两次-> 650列传递到计划程序中.这样做325次,您将看到它如何产生开销.

I believe that the reason for that is because you call the method .withColumn twice in the loop. What .withColumn does in the Spark execution plan is basically a select statement of all columns with 1 column changed as specified in the method. If you have 325 columns, then for single iteration this will call select on 325 columns twice -> 650 columns passed into the planner. Doing this 325 times you can see how it can create an overhead.

尽管很有趣,但是您不会因为一小部分样本而收到此错误,所以我希望不是这样.

However it is very interesting though that you do not receive this error for a small sample, I'd expect otherwise.

无论如何,您都可以尝试像这样替换您的forwardFillImputer:

Anyway you can try replacing your forwardFillImputer like this:

def forwardFillImputer(df, cols=[], partitioner="date", value="UNKNOWN"):
    window = Window \
     .partitionBy(F.month(partitioner)) \
     .orderBy(partitioner) \
     .rowsBetween(-sys.maxsize, 0)

    imputed_cols = [F.last(stringReplacer(F.col(i), value), ignorenulls=True).over(window).alias(i) 
                    for i in cols]

    missing_cols = [F.col(i) for i in df.columns if i not in cols]

    return df.select(missing_cols + imputed_cols)

这样,您基本上只需将一个select语句解析到计划器中,该语句应该更易于处理.

This way you basically just parse into planner a single select statement, which should be easier to handle.

仅作为警告,通常Spark在列数较多时效果不佳,因此您可能会在此过程中看到其他 strange 问题.

Just as a warning, generally Spark doesn't do well with high number of columns, so you might see other strange issues along the way.

这篇关于引发火花:java.lang.StackOverflowError窗口函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆