pyspark rdd isCheckPointed()为假 [英] pyspark rdd isCheckPointed() is false

查看:176
本文介绍了pyspark rdd isCheckPointed()为假的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我向pyspark数据帧迭代添加500多个列时遇到了stackoverflowerrors.因此,我包括了检查点.检查站没有帮助.因此,我创建了以下玩具应用程序以测试检查点是否正常运行.在此示例中,我要做的就是通过一遍又一遍地复制原始列来迭代地创建列.我坚持不懈,检查点并每10次迭代计数一次.我注意到我的dataframe.rdd.isCheckpointed()始终返回False.我可以验证检查点文件夹确实是在磁盘上创建并填充的.我正在glcoud上运行dataproc.

I was encountering stackoverflowerrors when I was iteratively adding over 500 columns to my pyspark dataframe. So, I included checkpoints. The checkpoints did not help. So, I created the following toy application to test if my checkpoints were working correctly. All I do in this example is iteratively create columns by copying the original column over and over again. I persist, checkpoint and count every 10 iterations. I notice that my dataframe.rdd.isCheckpointed() always returns False. I can verify that the checkpoint folders are indeed being created and populated on disk. I am running on dataproc on glcoud.

这是我的代码:

from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys

APP_NAME = "isCheckPointWorking"

spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .config("spark.sql.crossJoin.enabled","true")\
    .getOrCreate()

sc = SparkContext.getOrCreate()

#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')

#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()

#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40) 
colNewList = ['col'+str(x) for x in numberList]

print(colNewList)

iterCount = 0

for colName in colNewList:

    #copy column A in to the new column
    df4 = df4.withColumn(colName,df4.A)

    if (np.mod(iterCount,10) == 0):           
        df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)      

        df4.checkpoint(eager=True)

        df4.count()    
        #checking if underlying RDD is being checkpointed        
        print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))

    iterCount +=1

当我看到正在填充检查点文件夹时,为什么df4.rdd.isCheckpointed()每次都返回False尚不清楚.有什么想法吗?

It is unclear why df4.rdd.isCheckpointed() is returning False each time, when I can see that the checkpoint folder is being populated. Any thoughts?

推荐答案

checkpoint方法返回一个新的带有检查点的数据集,它不会修改当前的数据集.

The checkpoint method returns a new check-pointed Dataset, it does not modify the current Dataset.

更改

df4.checkpoint(eager=True)

收件人

df4 = df4.checkpoint(eager=True)

这篇关于pyspark rdd isCheckPointed()为假的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆