取消持久化(py)spark中的所有数据帧 [英] Un-persisting all dataframes in (py)spark

查看:139
本文介绍了取消持久化(py)spark中的所有数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是一个Spark应用程序,有几个要保留当前状态的地方.通常这是经过很大的一步,或者是缓存我要多次使用的状态.看来,当我第二次在数据帧上调用高速缓存时,新副本将高速缓存到内存中.在我的应用程序中,这会导致在扩展时出现内存问题.即使在我当前的测试中,给定的数据帧最大约为100 MB,中间结果的累积大小却超出了执行器上分配的内存.请参见下面的一个小示例,该示例显示了此行为.

I am a spark application with several points where I would like to persist the current state. This is usually after a large step, or caching a state that I would like to use multiple times. It appears that when I call cache on my dataframe a second time, a new copy is cached to memory. In my application, this leads to memory issues when scaling up. Even though, a given dataframe is a maximum of about 100 MB in my current tests, the cumulative size of the intermediate results grows beyond the alloted memory on the executor. See below for a small example that shows this behavior.

cache_test.py:

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csv:

simple_data.csv:

1,2,3
4,5,6
7,8,9

在应用程序用户界面上,除了原始数据框的副本和新列之外,还有一个原始数据框的副本.我可以通过在withColumn行之前调用df.unpersist()来删除原始副本.这是删除缓存的中间结果的推荐方法吗(即在每个cache()之前调用unpersist).

Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df.unpersist() before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache()).

还可以清除所有缓存的对象.在我的应用程序中,有自然的断点,我可以在其中简单地清除所有内存,然后继续下一个文件.我想这样做而不为每个输入文件创建新的spark应用程序.

Also, is it possible to purge all cached objects. In my application, there are natural breakpoints where I can simply purge all memory, and move on to the next file. I would like to do this without creating a new spark application for each input file.

提前谢谢!

推荐答案

Spark 2.x

您可以使用Catalog.clearCache:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

火花1.x

您可以使用 SQLContext.clearCache 方法

You can use SQLContext.clearCache method which

从内存缓存中删除所有缓存的表.

Removes all cached tables from the in-memory cache.

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()

这篇关于取消持久化(py)spark中的所有数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆