取消保留 (py)spark 中的所有数据帧 [英] Un-persisting all dataframes in (py)spark

查看:26
本文介绍了取消保留 (py)spark 中的所有数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是一个 Spark 应用程序,有几个点我想保留当前状态.这通常是在一个很大的步骤之后,或者缓存一个我想多次使用的状态.看来,当我第二次在我的数据帧上调用缓存时,一个新副本被缓存到内存中.在我的应用程序中,这会导致扩展时出现内存问题.尽管在我当前的测试中给定的数据帧最大约为 100 MB,但中间结果的累积大小超出了执行程序上分配的内存.请参阅下面的一个小示例,显示此行为.

I am a spark application with several points where I would like to persist the current state. This is usually after a large step, or caching a state that I would like to use multiple times. It appears that when I call cache on my dataframe a second time, a new copy is cached to memory. In my application, this leads to memory issues when scaling up. Even though, a given dataframe is a maximum of about 100 MB in my current tests, the cumulative size of the intermediate results grows beyond the alloted memory on the executor. See below for a small example that shows this behavior.

cache_test.py:

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csv:

simple_data.csv:

1,2,3
4,5,6
7,8,9

查看应用程序 UI,除了带有新列的数据帧之外,还有原始数据帧的副本.我可以通过在 withColumn 行之前调用 df.unpersist() 来删除原始副本.这是删除缓存中间结果的推荐方法吗(即在每个 cache() 之前调用 unpersist).

Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df.unpersist() before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache()).

另外,是否可以清除所有缓存的对象.在我的应用程序中,有一些自然断点,我可以简单地清除所有内存,然后转到下一个文件.我想这样做而不为每个输入文件创建一个新的 spark 应用程序.

Also, is it possible to purge all cached objects. In my application, there are natural breakpoints where I can simply purge all memory, and move on to the next file. I would like to do this without creating a new spark application for each input file.

先谢谢你!

推荐答案

Spark 2.x

您可以使用Catalog.clearCache:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

Spark 1.x

您可以使用 SQLContext.clearCache 方法,其中

You can use SQLContext.clearCache method which

从内存缓存中删除所有缓存的表.

Removes all cached tables from the in-memory cache.

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()

这篇关于取消保留 (py)spark 中的所有数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆