Spark Streaming JDBC在数据到来时读取流-数据源JDBC不支持流式读取 [英] Spark streaming jdbc read the stream as and when data comes - Data source jdbc does not support streamed reading

查看:281
本文介绍了Spark Streaming JDBC在数据到来时读取流-数据源JDBC不支持流式读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用PostGre作为数据库.我想为每个批次捕获一个表数据,并将其转换为实木复合地板文件并存储到s3中.我试图使用spark和readStream的JDBC选项进行连接,如下所示...

I am using PostGre as database. I want to capture one table data for each batch and convert it as parquet file and store in to s3. I tried to connect using JDBC options of spark and readStream like below...

val jdbcDF = spark.readStream
    .format("jdbc")
    .option("url", "jdbc:postgresql://myserver:5432/mydatabase")
    .option("dbtable", "database.schema.table")
    .option("user", "xxxxx")
    .option("password", "xxxxx")
    .load()

但是它抛出了不受支持的异常

but it throwed unsupported exception

Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
    at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
    at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)

我在正确的轨道上吗?真的不支持将数据库作为火花流的数据源吗?

Am I in right track ? Really there is no support for database as data source for spark streaming?

AFAIK的另一种实现方法是编写一个kafka生产者,将数据发布到kafka主题中,然后使用Spark Streaming ...

AFAIK other way of doing this is write a kafka producer to publish data in to kafka topic and then using spark streaming...

注意:我不想为此使用kafka connect一些辅助转换.

Note : I dont want to use kafka connect for this since I need to do some auxiliary transformations.

这是唯一的方法吗?

正确的做法是什么?有这样的例子吗?请协助!

What is the right way of doing this ? is there any example for such thing? Please assist!

推荐答案

Spark结构化流没有标准的JDBC源,但是您可以编写一个自定义,但是您应该了解您的表必须具有唯一的键,通过该键您可以可以跟踪更改.例如,您可以使用我的实现,不要忘记添加必要的内容JDBC驱动程序的依赖性

Spark structured streaming does not have a standard JDBC source, but you can write a custom, but you should understand that your table must have a unique key by which you can track changes. For example, you can take my implementation, do not forget to add the necessary JDBC driver to the dependencies

这篇关于Spark Streaming JDBC在数据到来时读取流-数据源JDBC不支持流式读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆