Spark结构化流的内存问题 [英] Memory issue with spark structured streaming

查看:155
本文介绍了Spark结构化流的内存问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在运行Spark 2.2.0中具有聚合和分区的结构化流时遇到内存问题:

I'm facing memory issues running structured stream with aggregation and partitioning in Spark 2.2.0:

session
    .readStream()
    .schema(inputSchema)
    .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
    .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
    .csv("s3://test-bucket/input")
    .as(Encoders.bean(TestRecord.class))
    .flatMap(mf, Encoders.bean(TestRecord.class))
    .dropDuplicates("testId", "testName")
    .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
    .writeStream()
    .option("path", "s3://test-bucket/output")
    .option("checkpointLocation", "s3://test-bucket/checkpoint")
    .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
    .partitionBy("year")
    .format("parquet")
    .outputMode(OutputMode.Append())
    .queryName("test-stream")
    .start();

在测试过程中,我注意到每次出现新数据时,使用的内存量都会增加,最后执行程序以代码137退出:

During testing I noticed that amount of used memory increases each time when new data comes and finally executors exit with code 137:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

我创建了一个堆转储,发现org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider所使用的大部分内存都引用了

I've created a heap dump and found that most of the memory used by org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider that is referenced from StateStore

乍一看,这看起来很正常,因为这是Spark将聚合密钥保存在内存中的方式.但是,我通过重命名源文件夹中的文件进行了测试,以便可以通过spark拾取它们.由于输入记录是相同的,所有其他行都应作为重复记录被拒绝,并且内存消耗不应增加,而是应该增加.

On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it is.

此外,GC时间占用了总处理时间的30%以上

Moreover, GC time took more than 30% of total processing time

这里是从执行程序中提取的堆转储,其运行的内存比上面的屏幕要少,因为当我从该执行器创建转储时,java进程刚刚在该过程的中间终止.

Here is a heap dump taken from the executor running with smaller amount of memory than on screens above since when I was creating a dump from that one the java process just terminated in the middle of the process.

推荐答案

迁移我对 SPARK-23682 该问题的询问者也已提交.

Migrating my comment on SPARK-23682 which asker of this question also filed in issue.

在HDFS状态存储提供程序中,它过多地将状态的多个版本缓存在内存中,默认为100个版本.此问题已通过 SPARK-24717 解决,并且仅维护两个版本(当前为重播,新为更新)在内存中的状态.该补丁将在Spark 2.4.0中提供.

In HDFS state store provider, it excessively caches the multiple versions of states in memory, default 100 versions. The issue is addressed by SPARK-24717, and it will only maintain two versions (current for replay, and new for update) of state in memory. The patch will be available in Spark 2.4.0.

这篇关于Spark结构化流的内存问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆