如何更新Spark中的少量记录 [英] How to update few records in Spark
问题描述
val dfA = sqlContext.sql(select * from employees ('Emp1','Emp2'))
val dfB = sqlContext.sql(select * from employees where id not in('Emp1','Emp2'))
val dfN = dfA.withColumn(department,lit(Finance))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable(intermediate_result)
dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist
val dfTmp = sqlContext.sql(select * from intermediate_result)
格式(parquet)。saveAsTable(employees)
dfTmp.unpersist
当我尝试保存它时,出现以下错误:
org.apache.spark .sql.AnalysisException:无法覆盖也正在读取的表
employees
。
at org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis(rules.scala:106)
at org.apache.spark.sql.execution.datasources.PreWriteCheck $$ anonfun $ apply $ 3.apply(rules.scala:182)
at org.apache.spark.sql.execution.datasources.PreWriteCheck $$ anonfun $ apply $ 3.apply(rules.scala:109)
at org。 apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 2.apply (CheckAnalysis.scala:218)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 2.apply(CheckAnalysis.scala:218)
at scala.collection.immutable .List.foreach(List.scala:318)
我的问题是:
- 我的方法是否正确更改两名雇员的部门
- 当我发布DataFrame时发生此错误
方法正确,以更改两名员工的部门
事实并非如此。只需重复Stack Overflow上多次提到的内容 - Apache Spark不是数据库。它不是为细粒度更新而设计的。如果您的项目需要这样的操作,请使用Hadoop上众多数据库中的一个。
为什么我在发布DataFrame时发生此错误
因为你没有。您所做的只是为执行计划添加一个名称。检查点将是最接近释放的,但您真的不希望在执行器失效的情况下最终进入破坏性操作。
您可以写入临时目录,删除输入并移动临时文件,但实际上只需使用适合该作业的工具即可。
i have the following program in Scala for the spark:
val dfA = sqlContext.sql("select * from employees where id in ('Emp1', 'Emp2')" )
val dfB = sqlContext.sql("select * from employees where id not in ('Emp1', 'Emp2')" )
val dfN = dfA.withColumn("department", lit("Finance"))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable("intermediate_result")
dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist
val dfTmp = sqlContext.sql("select * from intermediate_result")
dfTmp.write.mode("overwrite").format("parquet").saveAsTable("employees")
dfTmp.unpersist
when I try to save it, I get the following error:
org.apache.spark.sql.AnalysisException: Cannot overwrite table
employees
that is also being read from.; at org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis(rules.scala:106) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:182) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:109) at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111) at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109) at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218) at scala.collection.immutable.List.foreach(List.scala:318)
My questions are:
- Is my approach correct to change the department of two employees
- Why am I getting this error when I have released the DataFrames
Is my approach correct to change the department of two employees
It is not. Just to repeat something that has been said multiple times on Stack Overflow - Apache Spark is not a database. It is not designed for fine grained updates. If your projects requires operation like this, use one of many databases on Hadoop.
Why am I getting this error when I have released the DataFrames
Because you didn't. All you've done is adding a name to the execution plan. Checkpointing would be the closest thing to "releasing", but you really don't want to end up in situation when you loose executor, in the middle of destructive operation.
You could write to temporary directory, delete input and move the temporary files, but really - just use a tool which is fit for the job.
这篇关于如何更新Spark中的少量记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!