结合使用SparkSession.sql()和JDBC [英] Use SparkSession.sql() with JDBC

查看:1849
本文介绍了结合使用SparkSession.sql()和JDBC的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题:

我想使用JDBC连接使用spark发出自定义请求.

I would like to use JDBC connection to make a custom request using spark.

此查询的目的是优化工作程序上的内存分配,因为我不能使用它:

The goal of this query is to optimized memory allocation on workers, because of that I can't use :

ss.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

当前:

我当前正在尝试运行:

ss = SparkSession
  .builder()
  .appName(appName)
  .master("local")
  .config(conf)
  .getOrCreate()

ss.sql("some custom query")

配置:

url=jdbc:mysql://127.0.0.1/database_name
driver=com.mysql.jdbc.Driver
user=user_name
password=xxxxxxxxxx

错误:

[info] Exception encountered when attempting to run a suite with class name: db.TestUserProvider *** ABORTED ***
[info]   org.apache.spark.sql.AnalysisException: Table or view not found: users; line 1 pos 14
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:459)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:478)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)

假设:

我猜有一个配置错误,但是我找不到位置.

I guess there is a configuration error, but I can't find out where.

推荐答案

Spark可以使用JDBC数据源(与您一样)从关系数据库中读取/向关系数据库读取和写入数据在您的第一个代码示例中做了).

Spark can read and write data to/from relational databases using the JDBC data source (like you did in your first code example).

此外(完全独立),spark允许使用SQL查询视图,这些视图是根据已从某个来源加载到DataFrame中的数据创建的.例如:

In addition (and completely separately), spark allows using SQL to query views that were created over data that was already loaded into a DataFrame from some source. For example:

val df = Seq(1,2,3).toDF("a") // could be any DF, loaded from file/JDBC/memory...
df.createOrReplaceTempView("my_spark_table")
spark.sql("select a from my_spark_table").show()

只能使用SparkSession.sql查询以这种方式创建的表"(从Spark 2.0.0开始称为视图).

Only "tables" (called views, as of Spark 2.0.0) created this way can be queried using SparkSession.sql.

如果您的数据存储在关系数据库中,Spark将必须首先从那里读取数据,然后才可以在加载的副本上执行任何分布式计算.底线-我们可以使用read从表中加载数据,创建一个临时视图,然后查询它:

If your data is stored in a relational database, Spark will have to read it from there first, and only then would it be able to execute any distributed computation on the loaded copy. Bottom line - we can load the data from the table using read, create a temp view, and then query it:

ss.read
  .format("jdbc")
  .option("url", "jdbc:mysql://127.0.0.1/database_name")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()
  .createOrReplaceTempView("my_spark_table")

// and then you can query the view:
val df = ss.sql("select * from my_spark_table where ... ")

这篇关于结合使用SparkSession.sql()和JDBC的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆