如何在选项"dbtable"中指定子查询从Greenplum上的表读取数据时,在Spark-jdbc应用程序中运行? [英] How to specify subquery in the option "dbtable" in Spark-jdbc application while reading data from a table on Greenplum?
问题描述
我正在尝试使用Spark将Greenplum上的表中的数据读取到HDFS中.我在选项中提供了一个子查询来读取greenplum表,如下所示.
val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"
println("ExecQuery: " + execQuery)
val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
.option("dbtable", execQuery)
.option("user", devUsrName).option("password", devPwd)
.option("partitionColumn","id")
.option("lowerBound", 165512)
.option("upperBound", 11521481695656862L)
.option("numPartitions",300).load()
运行代码时,我看到以下异常:
Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear
Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF$1(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)
异常显示:public
作为dbname,子查询(execQuery)作为tablename
我尝试给exec查询如下:
val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"
或
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"
他们都没有工作.我正在使用jar:greenplum-spark_2.11-1.4.0.jar从greenplum读取数据. 以下是我尝试使用的spark-submit:
SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal ibusr@dev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar
我通过参考greenplumn文档中的说明编写了代码: https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html
我无法确定我在这里犯的错误.谁能让我知道如何解决这个问题?
用子查询替换dbtable
的选项是内置JDBC数据源的功能.但是,Greenplum Spark连接器似乎没有提供这种功能.
具体地,源由dbschema
和dbtable
标识,其中后一个应该是(强调我的意思):
Greenplum数据库表的名称.从Greenplum数据库读取时,该表必须位于
dbschema
选项值中标识的Greenplum数据库架构中.
这说明了您遇到的异常.
同时,您共享的代码中没有任何内容表示您实际上需要此功能.由于您不应用任何特定于数据库的逻辑,因此该过程可以简单地重写为
import org.apache.spark.sql.functions.{col, lit}
val allColumns: Seq[String] = ???
val dataDF = spark.read.format("greenplum")
.option("url", conUrl)
.option("dbtable", "xx_lines")
.option("dbschema", "dbanscience")
.option("partitionColumn", "id")
.option("user", devUsrName)
.option("password", devPwd)
.load()
.where("year = 2017 and month=12")
.select(allColumns map col:_*)
.withColumn(flagCol, lit(0))
请注意,您使用的其他选项( 根据官方文档: Greenplum数据库跨段存储表数据.使用Greenplum-Spark连接器加载Greenplum数据库表的Spark应用程序将特定的表列标识为分区列.连接器使用此列中的数据值将每个Greenplum数据库段上的特定表数据行分配给一个或多个Spark分区. 因此,如您所见,分发机制与内置JDBC源完全不同. 连接器还提供了另外一个 每个Greenplum数据库段的Spark分区数.可选,默认值为1个分区. I am trying to read data from a table on Greenplum into HDFS using Spark. I gave a subquery in options to read the greenplum table as below. When I run the code, I see the following exception: The exception show: I tried to give the exec query as: or None of them are working. I am using the jar: greenplum-spark_2.11-1.4.0.jar to read data from greenplum.
Below is the spark-submit I tried to use: I wrote the code by referring the instructions from the greenplumn documentation: https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html I am unable to identify the mistake I made here. Could anyone let me know how can I fix the issue? Option to replace Specifically the source is identified by The name of the Greenplum Database table. When reading from Greenplum Database, this table must reside in the Greenplum Database schema identified in the This explains the exception you get. At the same time nothing in the code you've shared indicates that you actually need such feature. Since you don't apply any database specific logic the process might be simply rewritten as Please note that other options you use ( According to the official documentation: Greenplum Database stores table data across segments. A Spark application using the Greenplum-Spark Connector to load a Greenplum Database table identifies a specific table column as a partition column. The Connector uses the data values in this column to assign specific table data rows on each Greenplum Database segment to one or more Spark partitions. So as you see distribution mechanism is completely different from the built-in JDBC source. Connector also provides an additional The number of Spark partitions per Greenplum Database segment. Optional, the default value is 1 partition.
这篇关于如何在选项"dbtable"中指定子查询从Greenplum上的表读取数据时,在Spark-jdbc应用程序中运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!upperBound
,lowerBound
,numPartitions
),也不需要"nofollow noreferrer>.
partitionsPerSegment
option
可以设置:
val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"
println("ExecQuery: " + execQuery)
val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
.option("dbtable", execQuery)
.option("user", devUsrName).option("password", devPwd)
.option("partitionColumn","id")
.option("lowerBound", 165512)
.option("upperBound", 11521481695656862L)
.option("numPartitions",300).load()
Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear
Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF$1(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)
public
as the dbname and the subquery (execQuery) as the tablename
val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"
SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal ibusr@dev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar
dbtable
with subquery is a feature of the built-in JDBC data source. However Greenplum Spark Connector doesn't seem to provide such capabilities.dbschema
and dbtable
where the latter one should be (emphasis mine):
dbschema
option value. import org.apache.spark.sql.functions.{col, lit}
val allColumns: Seq[String] = ???
val dataDF = spark.read.format("greenplum")
.option("url", conUrl)
.option("dbtable", "xx_lines")
.option("dbschema", "dbanscience")
.option("partitionColumn", "id")
.option("user", devUsrName)
.option("password", devPwd)
.load()
.where("year = 2017 and month=12")
.select(allColumns map col:_*)
.withColumn(flagCol, lit(0))
upperBound
, lowerBound
, numPartitions
) are neither supported nor required.
partitionsPerSegment
option
which sets: