我如何使用Scala连接到PostgreSQL数据库到Apache中火花? [英] How can I connect to a postgreSQL database into Apache Spark using scala?

查看:365
本文介绍了我如何使用Scala连接到PostgreSQL数据库到Apache中火花?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道我该怎么做以下的事情在Scala呢?

I want to know how can I do following things in scala?


  1. 使用星火斯卡拉连接到PostgreSQL数据库。

  2. 像SELECT,UPDATE等编写SQL语句来修改表
    该数据库。

我知道使用Scala做,但如何导入PSQL阶的连接器罐子到SBT而包​​装呢?

I know to do it using scala but how to import the connector jar of psql scala into sbt while packaging it?

推荐答案

我们的目标是从星火工人并行运行的SQL查询。

Our goal is to run parallel SQL queries from the Spark workers.

添加连接器和JDBC的 libraryDependencies build.sbt 。我只是试图与MySQL的,所以我会用在我的例子,但Postgres的应该是大致相同的。

Add the connector and JDBC to the libraryDependencies in build.sbt. I've only tried this with MySQL, so I'll use that in my examples, but Postgres should be much the same.

libraryDependencies ++= Seq(
  jdbc,
  "mysql" % "mysql-connector-java" % "5.1.29",
  "org.apache.spark" %% "spark-core" % "1.0.1",
  // etc
)

code

在创建 SparkContext 你告诉它罐子复制到执行者。包括连接器罐子。好看的方式来做到这一点:

Code

When you create the SparkContext you tell it which jars to copy to the executors. Include the connector jar. A good-looking way to do this:

val classes = Seq(
  getClass,                   // To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)

现在Spark是准备好连接到数据库。每个执行器将运行查询的一部分,这样的结果是准备好分布式计算

Now Spark is ready to connect to the database. Each executor will run part of the query, so that the results are ready for distributed computation.

有这两个选项。旧的方法是使用<一个href=\"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD\"><$c$c>org.apache.spark.rdd.JdbcRDD:

There are two options for this. The older approach is to use org.apache.spark.rdd.JdbcRDD:

val rdd = new org.apache.spark.rdd.JdbcRDD(
  sc,
  () => {
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  },
  "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
  0, 1000, 10,
  row => row.getString("BOOK_TITLE")
)

检查出的参数文件。简述:

Check out the documentation for the parameters. Briefly:


  • 您有 SparkContext

  • 然后创建连接的功能。这将要求每个工人连接到数据库。

  • 然后SQL查询。这具有以类似于例子,包含起始和结束键占位符。

  • 然后指定键(在我的例子0至1000)的范围和分区的数量。范围将分区之间进行分配。因此一个执行器线程最终将执行 SELECT * FROM foo其中0℃=键和键&LT; = 100 在这个例子

  • ,最后我们有一个在的ResultSet 转换成一些功能。在这个例子中,我们将它转​​换成字符串,所以你最终与 RDD [字符串]

  • You have the SparkContext.
  • Then a function that creates the connection. This will be called on each worker to connect to the database.
  • Then the SQL query. This has to be similar to the example, and contain placeholders for the starting and ending key.
  • Then you specify the range of keys (0 to 1000 in my example) and the number of partitions. The range will be divided among the partitions. So one executor thread will end up executing SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100 in the example.
  • And at last we have a function that converts the ResultSet into something. In the example we convert it into a String, so you end up with an RDD[String].

因为Apache星火1.3.0版本的另一种方法是可以通过数据框API。取而代之的是 JdbcRDD 您可以创建一个<一个href=\"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame\"><$c$c>org.apache.spark.sql.DataFrame:

Since Apache Spark version 1.3.0 another method is available through the DataFrame API. Instead of the JdbcRDD you would create an org.apache.spark.sql.DataFrame:

val df = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
  "dbtable" -> "BOOKS"))

请参阅https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases对于选项的完整列表(分区键范围和数量可以设置就像使用 JdbcRDD )。

See https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases for the full list of options (the key range and number of partitions can be set just like with JdbcRDD).

JdbcRDD 不支持更新。但是你可以简单地做他们在一个 foreachPartition

JdbcRDD does not support updates. But you can simply do them in a foreachPartition.

rdd.foreachPartition { it =>
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
  for (bookTitle <- it) {
    del.setString(1, bookTitle)
    del.executeUpdate
  }
}

(这将创建每个分区一个连接。如果这是一个问题,使用连接池!)

(This creates one connection per partition. If that is a concern, use a connection pool!)

数据帧 s到了 createJDBCTable insertIntoJDBC 方法。

这篇关于我如何使用Scala连接到PostgreSQL数据库到Apache中火花?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆