Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id 已经设置 [英] Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id is already set

查看:16
本文介绍了Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id 已经设置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 spark 1.6 并在运行以下代码时遇到了上述问题:

I'm using spark 1.6 and run into the issue above when I run the following code:

// Imports
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SaveMode
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.Properties
import scala.concurrent.Future

// Set up spark on local with 2 threads
val conf = new SparkConf().setMaster("local[2]").setAppName("app")
val sc = new SparkContext(conf)
val sqlCtx = new HiveContext(sc)

// Create fake dataframe
import sqlCtx.implicits._
var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2)
// Write it as a parquet file
df.write.parquet("/tmp/parquet1")
df = sqlCtx.read.parquet("/tmp/parquet1")

// JDBC connection
val url = s"jdbc:postgresql://localhost:5432/tempdb"
val prop = new Properties()
prop.setProperty("user", "admin")
prop.setProperty("password", "")

// 4 futures - at least one of them has been consistently failing for
val x1 = Future { df.write.jdbc(url, "temp1", prop) }
val x2 = Future { df.write.jdbc(url, "temp2", prop) }
val x3 = Future { df.write.jdbc(url, "temp3", prop) }
val x4 = Future { df.write.jdbc(url, "temp4", prop) }

这里是 github 要点:https://gist.github.com/karanveerm/27d852bf311e39f05491

Here's the github gist: https://gist.github.com/karanveerm/27d852bf311e39f05491

我得到的错误是:在

org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
        at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
        at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
        at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na]
        at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na]
        at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6]
        at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6]
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na]

这是一个火花错误还是我做错了什么/有什么解决方法?

Is this a spark bug or am I doing something wrong / any workarounds?

推荐答案

在尝试了几件事之后,我发现全局 ForkJoinPool 创建的线程之一得到了它的 spark.sql.execution.id 属性设置为随机值.我无法确定实际执行此操作的过程,但我可以通过使用自己的 ExecutionContext 来解决它.

After trying several things, I found that one of the threads created by the global ForkJoinPool gets its spark.sql.execution.id property set to a random value. I could not identify the process that actually did that but I could work around it by using my own ExecutionContext.

import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
implicit val ec = ExecutionContext.fromExecutorService(executorService)

我使用了来自 http://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.html.也许 ForkJoinPool 在创建新线程时会克隆线程属性,如果在 SQL 执行的上下文中发生这种情况,它将获得其非空值,而 FixedThreadPool 将在以下位置创建线程实例化.

I used code from http://danielwestheide.com/blog/2013/01/16/the-neophytes-guide-to-scala-part-9-promises-and-futures-in-practice.html. Maybe the ForkJoinPool clones threads attributes when creating new ones and if this happens during the context of an SQL execution it would get its non null value whereas a FixedThreadPool will create the threads at instantiation.

这篇关于Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id 已经设置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆