pyspark rdd isCheckPointed()为假 [英] pyspark rdd isCheckPointed() is false
问题描述
当我向pyspark数据帧迭代添加500多个列时遇到了stackoverflowerrors.因此,我包括了检查点.检查站没有帮助.因此,我创建了以下玩具应用程序以测试检查点是否正常运行.在此示例中,我要做的就是通过一遍又一遍地复制原始列来迭代地创建列.我坚持不懈,检查点并每10次迭代计数一次.我注意到我的dataframe.rdd.isCheckpointed()始终返回False.我可以验证检查点文件夹确实是在磁盘上创建并填充的.我正在glcoud上运行dataproc.
I was encountering stackoverflowerrors when I was iteratively adding over 500 columns to my pyspark dataframe. So, I included checkpoints. The checkpoints did not help. So, I created the following toy application to test if my checkpoints were working correctly. All I do in this example is iteratively create columns by copying the original column over and over again. I persist, checkpoint and count every 10 iterations. I notice that my dataframe.rdd.isCheckpointed() always returns False. I can verify that the checkpoint folders are indeed being created and populated on disk. I am running on dataproc on glcoud.
这是我的代码:
from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys
APP_NAME = "isCheckPointWorking"
spark = SparkSession\
.builder\
.appName(APP_NAME)\
.config("spark.sql.crossJoin.enabled","true")\
.getOrCreate()
sc = SparkContext.getOrCreate()
#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')
#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()
#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40)
colNewList = ['col'+str(x) for x in numberList]
print(colNewList)
iterCount = 0
for colName in colNewList:
#copy column A in to the new column
df4 = df4.withColumn(colName,df4.A)
if (np.mod(iterCount,10) == 0):
df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)
df4.checkpoint(eager=True)
df4.count()
#checking if underlying RDD is being checkpointed
print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))
iterCount +=1
当我看到正在填充检查点文件夹时,为什么df4.rdd.isCheckpointed()每次都返回False尚不清楚.有什么想法吗?
It is unclear why df4.rdd.isCheckpointed() is returning False each time, when I can see that the checkpoint folder is being populated. Any thoughts?
推荐答案
checkpoint方法返回一个新的带有检查点的数据集,它不会修改当前的数据集.
The checkpoint method returns a new check-pointed Dataset, it does not modify the current Dataset.
更改
df4.checkpoint(eager=True)
收件人
df4 = df4.checkpoint(eager=True)
这篇关于pyspark rdd isCheckPointed()为假的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!