如何在选项"dbtable"中指定子查询从Greenplum上的表读取数据时,在Spark-jdbc应用程序中运行? [英] How to specify subquery in the option "dbtable" in Spark-jdbc application while reading data from a table on Greenplum?

查看:221
本文介绍了如何在选项"dbtable"中指定子查询从Greenplum上的表读取数据时,在Spark-jdbc应用程序中运行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark将Greenplum上的表中的数据读取到HDFS中.我在选项中提供了一个子查询来读取greenplum表,如下所示.

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"

println("ExecQuery: " + execQuery)

val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
                     .option("dbtable", execQuery)
                     .option("user", devUsrName).option("password", devPwd)
                     .option("partitionColumn","id")
                     .option("lowerBound", 165512)
                     .option("upperBound", 11521481695656862L)
                     .option("numPartitions",300).load()

运行代码时,我看到以下异常:

Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF$1(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)

异常显示:public作为dbname,子查询(execQuery)作为tablename

我尝试给exec查询如下:

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"

val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"

他们都没有工作.我正在使用jar:greenplum-spark_2.11-1.4.0.jar从greenplum读取数据. 以下是我尝试使用的spark-submit:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal ibusr@dev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar

我通过参考greenplumn文档中的说明编写了代码: https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html

我无法确定我在这里犯的错误.谁能让我知道如何解决这个问题?

解决方案

用子查询替换dbtable的选项是内置JDBC数据源的功能.但是,Greenplum Spark连接器似乎没有提供这种功能.

具体地,源由dbschemadbtable标识,其中后一个应该是(强调我的意思):

Greenplum数据库表的名称.从Greenplum数据库读取时,该表必须位于dbschema 选项值中标识的Greenplum数据库架构中.

这说明了您遇到的异常.

同时,您共享的代码中没有任何内容表示您实际上需要此功能.由于您不应用任何特定于数据库的逻辑,因此该过程可以简单地重写为

import org.apache.spark.sql.functions.{col, lit}

val allColumns: Seq[String] = ???

val dataDF = spark.read.format("greenplum")
  .option("url", conUrl)
  .option("dbtable", "xx_lines")
  .option("dbschema", "dbanscience")
  .option("partitionColumn", "id")
  .option("user", devUsrName)
  .option("password", devPwd)
  .load()
  .where("year = 2017 and month=12")
  .select(allColumns map col:_*)
  .withColumn(flagCol, lit(0))

请注意,您使用的其他选项(upperBoundlowerBoundnumPartitions),也不需要"nofollow noreferrer>.

根据官方文档:

Greenplum数据库跨段存储表数据.使用Greenplum-Spark连接器加载Greenplum数据库表的Spark应用程序将特定的表列标识为分区列.连接器使用此列中的数据值将每个Greenplum数据库段上的特定表数据行分配给一个或多个Spark分区.

因此,如您所见,分发机制与内置JDBC源完全不同.

连接器还提供了另外一个partitionsPerSegment option 可以设置:

每个Greenplum数据库段的Spark分区数.可选,默认值为1个分区.

I am trying to read data from a table on Greenplum into HDFS using Spark. I gave a subquery in options to read the greenplum table as below.

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"

println("ExecQuery: " + execQuery)

val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
                     .option("dbtable", execQuery)
                     .option("user", devUsrName).option("password", devPwd)
                     .option("partitionColumn","id")
                     .option("lowerBound", 165512)
                     .option("upperBound", 11521481695656862L)
                     .option("numPartitions",300).load()

When I run the code, I see the following exception:

Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF$1(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)

The exception show: public as the dbname and the subquery (execQuery) as the tablename

I tried to give the exec query as:

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"

or

val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"

None of them are working. I am using the jar: greenplum-spark_2.11-1.4.0.jar to read data from greenplum. Below is the spark-submit I tried to use:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal ibusr@dev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar

I wrote the code by referring the instructions from the greenplumn documentation: https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html

I am unable to identify the mistake I made here. Could anyone let me know how can I fix the issue?

解决方案

Option to replace dbtable with subquery is a feature of the built-in JDBC data source. However Greenplum Spark Connector doesn't seem to provide such capabilities.

Specifically the source is identified by dbschema and dbtable where the latter one should be (emphasis mine):

The name of the Greenplum Database table. When reading from Greenplum Database, this table must reside in the Greenplum Database schema identified in the dbschema option value.

This explains the exception you get.

At the same time nothing in the code you've shared indicates that you actually need such feature. Since you don't apply any database specific logic the process might be simply rewritten as

import org.apache.spark.sql.functions.{col, lit}

val allColumns: Seq[String] = ???

val dataDF = spark.read.format("greenplum")
  .option("url", conUrl)
  .option("dbtable", "xx_lines")
  .option("dbschema", "dbanscience")
  .option("partitionColumn", "id")
  .option("user", devUsrName)
  .option("password", devPwd)
  .load()
  .where("year = 2017 and month=12")
  .select(allColumns map col:_*)
  .withColumn(flagCol, lit(0))

Please note that other options you use (upperBound, lowerBound, numPartitions) are neither supported nor required.

According to the official documentation:

Greenplum Database stores table data across segments. A Spark application using the Greenplum-Spark Connector to load a Greenplum Database table identifies a specific table column as a partition column. The Connector uses the data values in this column to assign specific table data rows on each Greenplum Database segment to one or more Spark partitions.

So as you see distribution mechanism is completely different from the built-in JDBC source.

Connector also provides an additional partitionsPerSegment option which sets:

The number of Spark partitions per Greenplum Database segment. Optional, the default value is 1 partition.

这篇关于如何在选项"dbtable"中指定子查询从Greenplum上的表读取数据时,在Spark-jdbc应用程序中运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆