从配置单元表读取并在pyspark中更新同一表-使用检查点 [英] reading from hive table and updating same table in pyspark - using checkpoint

查看:123
本文介绍了从配置单元表读取并在pyspark中更新同一表-使用检查点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark 2.3版,并尝试以以下方式读取Spark中的配置单元表:

I am using spark version 2.3 and trying to read hive table in spark as:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = spark.table("emp.emptable")

在这里,我要向系统添加一个具有当前日期的新列到现有数据框

here I am adding a new column with current date from system to the existing dataframe

import pyspark.sql.functions as F
newdf = df.withColumn('LOAD_DATE', F.current_date())

当我试图将此数据帧编写为配置单元表时,现在面临一个问题

and now facing an issue,when I am trying to write this dataframe as hive table

newdf.write.mode("overwrite").saveAsTable("emp.emptable")

pyspark.sql.utils.AnalysisException: u'Cannot overwrite table emp.emptable that is also being read from;'

所以我要检查数据帧以破坏谱系,因为我正在从同一数据帧读取和写入

so I am checkpointing the dataframe to break the lineage since I am reading and writing from same dataframe

checkpointDir = "/hdfs location/temp/tables/"
spark.sparkContext.setCheckpointDir(checkpointDir)
df = spark.table("emp.emptable").coalesce(1).checkpoint()
newdf = df.withColumn('LOAD_DATE', F.current_date())
newdf.write.mode("overwrite").saveAsTable("emp.emptable")

这样,它可以正常工作,并且新列已添加到配置单元表中.但是每次创建检查点文件时,我都必须删除它.有没有最好的办法来打破血统,并使用更新的列详细信息写入相同的数据框,并将其保存到hdfs位置或作为配置单元表.

This way it's working fine and new column has been added to the hive table. but I have to delete the checkpoint files every time it's get created. Is there any best way to break the lineage and write the same dataframe with updated column details and save it to hdfs location or as a hive table.

或者有什么方法可以为检查点目录指定一个临时位置,该位置将在spark会话完成后被删除.

or is there any way to specify a temp location for checkpoint directory, which will get deleted post the spark session completes.

推荐答案

正如我们在

As we discussed in this post, setting below property is way to go.

spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")

这个问题有不同的背景.我们想保留 checkpointed 数据集,因此不必在意添加清理解决方案.

That question had different context. we wanted to retain the checkpointed dataset so did not care to add on cleanup solution.

设置上述属性可以 某个时候正常工作 (经过测试的scala,java和python),但是难以依靠.官方文档说,通过设置此属性,它控制是否在引用超出范围的情况下清除检查点文件.我不知道这到底是什么意思,因为我的理解是,一旦触发session/context,停止它应该清理它.如果有人可以轻描淡写,那就太好了.

Setting above property is working sometime(tested scala, java and python) but its hard to rely on it. Official document says that by setting this property it Controls whether to clean checkpoint files if the reference is out of scope. I don't know what exactly it means because my understanding is that once spark session/context is stopped it should clean it. Would be great if someone can shad light on it.

关于

有什么最好的方法可以打破血统

Is there any best way to break the lineage

检查这个问题,@BiS找到了一种使用 createDataFrame(RDD,Schema)方法剪切谱系的方法.我还没有自己测试过.

Check this question, @BiS found some way to cut the lineage using createDataFrame(RDD, Schema) method. I haven't tested it by myself though.

仅供参考,我通常不依赖上述属性,而是为了安全起见删除代码本身中的 checkpointed 目录.

Just FYI, I don't rely on above property usually and delete the checkpointed directory in code itself to be on safe side.

我们可以得到如下所示的 checkpointed 目录:

We can get the checkpointed directory like below:

scala:

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path 

PySpark:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t 
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

# notice 'u' at the start which means It returns unicode object use str(t)
# Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
False

这篇关于从配置单元表读取并在pyspark中更新同一表-使用检查点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆