在连接星火阿帕奇对于SQLite [英] Connect to SQLite in Apache Spark

查看:181
本文介绍了在连接星火阿帕奇对于SQLite的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想运行在一个SQLite数据库中所有表的自定义函数。该功能或多或少相同,但要看个人表的架构。另外,表及其图式在运行时仅称为(程序被调用指定数据库的路径的参数)。

I want to run a custom function on all tables in a SQLite database. The function is more or less the same, but depends on the schema of the individual table. Also, the tables and their schemata are only known at runtime (the program is called with an argument that specifies the path of the database).

这是我迄今为止:

val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// somehow bind sqlContext to DB

val allTables = sqlContext.tableNames

for( t <- allTables) {
    val df = sqlContext.table(t)
    val schema = df.columns
    sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}

我迄今发现的唯一线索需要事先知道表,这是不是在我的方案的情况下:

The only hint I found so far needs to know the table in advance, which is not the case in my scenario:

val tableData = 
  sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
    .load()

我使用的SQLite的xerial jdbc驱动。所以,我怎么能只conntect到数据库,而不是一个表?

I am using the xerial sqlite jdbc driver. So how can I conntect solely to a database, not to a table?

编辑:使用铍的答案,因为一开始我更新了我的code这样:

Using Beryllium's answer as a start I updated my code to this:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val metaData = sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
                 "dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()

val myTableNames = metaData.select("tbl_name").distinct()

for (t <- myTableNames) {
    println(t.toString)

    val tableData = sqlContext.table(t.toString)

    for (record <- tableData.select("*")) {
        println(record)
    }
}

至少我可以在运行时读取表名这是一个巨大的进步对我来说。但我不能读表。我想这两个

At least I can read the table names at runtime which is a huge step forward for me. But I can't read the tables. I tried both

val tableData = sqlContext.table(t.toString)

val tableData = sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
                 "dbtable" -> t.toString)).load()

在循环,但在这两种情况下,我得到一个NullPointerException异常。虽然我可以打印表名,似乎我无法连接到它们。

in the loop, but in both cases I get a NullPointerException. Although I can print the table names it seems I cannot connect to them.

最后但并非最不重要,我总是得到一个 SQLITE_ERROR:连接已关闭错误。它看起来是在这个问题上所描述的同样的问题:<一href=\"http://stackoverflow.com/questions/32860100/sqlite-error-connection-is-closed-when-connecting-from-spark-via-jdbc-to-sqlite\">SQLITE_ERROR:通过JDBC从星火连接到SQLite数据库当连接关闭

Last but not least I always get an SQLITE_ERROR: Connection is closed error. It looks to be the same issue described in this question: SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database

推荐答案

有两种选择,你可以尝试

There are two options you can try


  • 打开你的星火工作的独立,普通的JDBC连接

  • 从JDBC元数据获取的表名

  • 这些饲料到你的 COM prehension

  • Open a separate, plain JDBC connection in your Spark job
  • Get the tables names from the JDBC meta data
  • Feed these into your for comprehension

您可以指定查询作为 DBTABLE 参数的值。在语法上这个查询必须看起来像一个表,所以它必须被包裹在一个子查询。

You can specify a query as the value for the dbtable argument. Syntactically this query must "look" like a table, so it must be wrapped in a sub query.

在该查询,从数据库中获取的元数据

In that query, get the meta data from the database:

val df = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:xxx",
    "user" -> "x",
    "password" -> "x",
    "dbtable" -> "(select * from pg_tables) as t")).load()

本例将PostgreSQL的,你必须去适应它SQLite的。

This example works with PostgreSQL, you have to adapt it for SQLite.

更新

看来,JDBC驱动程序仅支持遍历一个结果集。
无论如何,当你兑现表名称使用)的列表收集(,然后下面的代码片断应该工作:

It seems that the JDBC driver only supports to iterate over one result set. Anyway, when you materialize the list of table names using collect(), then the following snippet should work:

val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()

for (t <- myTableNames) {
  println(t.toString)

  val tableData = sqlContext.read.format("jdbc")
    .options(
      Map(
        "url" -> "jdbc:sqlite:/x.db",
        "dbtable" -> t)).load()

  tableData.show()
}

这篇关于在连接星火阿帕奇对于SQLite的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆