结合使用SparkSession.sql()和JDBC [英] Use SparkSession.sql() with JDBC
问题描述
问题:
我想使用JDBC连接使用spark发出自定义请求.
I would like to use JDBC connection to make a custom request using spark.
此查询的目的是优化工作程序上的内存分配,因为我不能使用它:
The goal of this query is to optimized memory allocation on workers, because of that I can't use :
ss.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
当前:
我当前正在尝试运行:
ss = SparkSession
.builder()
.appName(appName)
.master("local")
.config(conf)
.getOrCreate()
ss.sql("some custom query")
配置:
url=jdbc:mysql://127.0.0.1/database_name
driver=com.mysql.jdbc.Driver
user=user_name
password=xxxxxxxxxx
错误:
[info] Exception encountered when attempting to run a suite with class name: db.TestUserProvider *** ABORTED ***
[info] org.apache.spark.sql.AnalysisException: Table or view not found: users; line 1 pos 14
[info] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:459)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:478)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
假设:
我猜有一个配置错误,但是我找不到位置.
I guess there is a configuration error, but I can't find out where.
推荐答案
Spark可以使用JDBC数据源(与您一样)从关系数据库中读取/向关系数据库读取和写入数据在您的第一个代码示例中做了).
Spark can read and write data to/from relational databases using the JDBC data source (like you did in your first code example).
此外(完全独立),spark允许使用SQL查询视图,这些视图是根据已从某个来源加载到DataFrame中的数据创建的.例如:
In addition (and completely separately), spark allows using SQL to query views that were created over data that was already loaded into a DataFrame from some source. For example:
val df = Seq(1,2,3).toDF("a") // could be any DF, loaded from file/JDBC/memory...
df.createOrReplaceTempView("my_spark_table")
spark.sql("select a from my_spark_table").show()
只能使用SparkSession.sql
查询以这种方式创建的表"(从Spark 2.0.0开始称为视图).
Only "tables" (called views, as of Spark 2.0.0) created this way can be queried using SparkSession.sql
.
如果您的数据存储在关系数据库中,Spark将必须首先从那里读取数据,然后才可以在加载的副本上执行任何分布式计算.底线-我们可以使用read
从表中加载数据,创建一个临时视图,然后查询它:
If your data is stored in a relational database, Spark will have to read it from there first, and only then would it be able to execute any distributed computation on the loaded copy. Bottom line - we can load the data from the table using read
, create a temp view, and then query it:
ss.read
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1/database_name")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
.createOrReplaceTempView("my_spark_table")
// and then you can query the view:
val df = ss.sql("select * from my_spark_table where ... ")
这篇关于结合使用SparkSession.sql()和JDBC的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!