在Spark上执行多个SQL查询 [英] Executing multiple SQL queries on Spark

查看:1540
本文介绍了在Spark上执行多个SQL查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在文件test.sql中有一个Spark SQL查询-

I have a Spark SQL query in a file test.sql -

CREATE GLOBAL TEMPORARY VIEW VIEW_1 AS select a,b from abc

CREATE GLOBAL TEMPORARY VIEW VIEW_2 AS select a,b from VIEW_1

select * from VIEW_2

现在,我启动我的spark-shell并尝试像这样执行它-

Now, I start my spark-shell and try to execute it like this -

val sql = scala.io.Source.fromFile("test.sql").mkString
spark.sql(sql).show

此操作失败,并显示以下错误-

This fails with the following error -

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<' expecting {<EOF>, 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'OR', 'AND', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 128)

我尝试在不同的spark.sql语句中一一执行这些查询,并且运行正常.问题是,我有6-7个查询,这些查询创建了临时视图,最后我需要从上一个视图输出.有没有一种方法可以在单个spark.sql语句中运行这些SQL.我已经研究过Postgres SQL(Redshift),并且能够执行这种查询.在Spark sql中,在这种情况下,我将不得不维护很多文件.

I tried to execute these queries 1 by 1 in different spark.sql statements and it runs fine. The problem is, I have 6-7 queries which creates temporary views and finally i need output from my last view. Is there a way through which i can run these SQL's in a single spark.sql statement. I have worked on Postgres SQL (Redshift) and that is able to execute such kind of queries. In spark sql, i will have to maintain a lot of files in this case.

推荐答案

问题是mkString将所有行连接到一个字符串中,而这些字符串不能正确地解析为有效的SQL查询.

The problem is that mkString concatenates all the lines in a single string, which cannot be properly parsed as a valid SQL query.

脚本文件中的每一行都应作为单独的查询执行,例如:

Each line from the script file should be executed as a separate query, for example:

scala.io.Source.fromFile("test.sql").getLines()
  .filterNot(_.isEmpty)  // filter out empty lines
  .foreach(query =>
    spark.sql(query).show
  )

更新

如果查询被拆分成多行,情况会更加复杂.

Update

If queries are split on more than one line, the case is a bit more complex.

我们绝对需要一个标记来结束查询.像标准SQL一样,将其设置为分号字符.

We absolutely need to have a token that marks the end of a query. Let it be the semi-colon character, as in standard SQL.

首先,我们从源文件中收集所有非空行:

First, we collect all non-empty lines from the source file:

val lines = scala.io.Source.fromFile(sqlFile).getLines().filterNot(_.isEmpty)

然后,我们处理收集的行,如果每行都没有以分号结尾,则将其与上一行连接起来:

Then we process the collected lines, concatenating each new line with the previous one, if it does not end with a semicolon:

val queries = lines.foldLeft(List[String]()) { case(queries, line) =>
  queries match {
    case Nil => List(line) // case for the very first line
    case init :+ last =>
      if (last.endsWith(";")) {
        // if a query ended on a previous line, we simply append the new line to the list of queries
        queries :+ line.trim
      } else {
        // the query is not terminated yet, concatenate the line with the previous one
        val queryWithNextLine = last + " " + line.trim
        init :+ queryWithNextLine
      }
  }
}

这篇关于在Spark上执行多个SQL查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆