如何更新Spark中的少量记录 [英] How to update few records in Spark

查看:1312
本文介绍了如何更新Spark中的少量记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  val dfA = sqlContext.sql(select * from employees ('Emp1','Emp2'))
val dfB = sqlContext.sql(select * from employees where id not in('Emp1','Emp2'))
val dfN = dfA.withColumn(department,lit(Finance))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable(intermediate_result)

dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist

val dfTmp = sqlContext.sql(select * from intermediate_result)
格式(parquet)。saveAsTable(employees)
dfTmp.unpersist

当我尝试保存它时,出现以下错误:


org.apache.spark .sql.AnalysisException:无法覆盖也正在读取的表 employees
at org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis(rules.scala:106)
at org.apache.spark.sql.execution.datasources.PreWriteCheck $$ anonfun $ apply $ 3.apply(rules.scala:182)
at org.apache.spark.sql.execution.datasources.PreWriteCheck $$ anonfun $ apply $ 3.apply(rules.scala:109)
at org。 apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 2.apply (CheckAnalysis.scala:218)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 2.apply(CheckAnalysis.scala:218)
at scala.collection.immutable .List.foreach(List.scala:318)


我的问题是:


  1. 我的方法是否正确更改两名雇员的部门

  2. 当我发布DataFrame时发生此错误


解决方案


方法正确,以更改两名员工的部门

事实并非如此。只需重复Stack Overflow上多次提到的内容 - Apache Spark不是数据库。它不是为细粒度更新而设计的。如果您的项目需要这样的操作,请使用Hadoop上众多数据库中的一个。


为什么我在发布DataFrame时发生此错误


因为你没有。您所做的只是为执行计划添加一个名称。检查点将是最接近释放的,但您真的不希望在执行器失效的情况下最终进入破坏性操作。



您可以写入临时目录,删除输入并移动临时文件,但实际上只需使用适合该作业的工具即可。


i have the following program in Scala for the spark:

val dfA = sqlContext.sql("select * from employees where id in ('Emp1', 'Emp2')" )
val dfB = sqlContext.sql("select * from employees where id not in ('Emp1', 'Emp2')" )
val dfN = dfA.withColumn("department", lit("Finance"))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable("intermediate_result")

dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist

val dfTmp = sqlContext.sql("select * from intermediate_result")
dfTmp.write.mode("overwrite").format("parquet").saveAsTable("employees")
dfTmp.unpersist

when I try to save it, I get the following error:

org.apache.spark.sql.AnalysisException: Cannot overwrite table employees that is also being read from.; at org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis(rules.scala:106) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:182) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:109) at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111) at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109) at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218) at scala.collection.immutable.List.foreach(List.scala:318)

My questions are:

  1. Is my approach correct to change the department of two employees
  2. Why am I getting this error when I have released the DataFrames

解决方案

Is my approach correct to change the department of two employees

It is not. Just to repeat something that has been said multiple times on Stack Overflow - Apache Spark is not a database. It is not designed for fine grained updates. If your projects requires operation like this, use one of many databases on Hadoop.

Why am I getting this error when I have released the DataFrames

Because you didn't. All you've done is adding a name to the execution plan. Checkpointing would be the closest thing to "releasing", but you really don't want to end up in situation when you loose executor, in the middle of destructive operation.

You could write to temporary directory, delete input and move the temporary files, but really - just use a tool which is fit for the job.

这篇关于如何更新Spark中的少量记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆