如何创建自定义流数据源? [英] How to create a custom streaming data source?
问题描述
我有一个用于Spark Streaming的自定义读取器,该读取器从WebSocket读取数据.我将尝试Spark结构化流.
I have a custom reader for Spark Streaming that reads data from WebSocket. I'm going to try Spark Structured Streaming.
如何在Spark结构化流中创建流数据源?
How to create a streaming data source in Spark Structured Streaming?
推荐答案
流数据源实现 org.apache.spark.sql.execution.streaming.Source
的scaladoc应该为您提供足够的入门信息(只需按照类型来开发可编译的Scala类型).
The scaladoc of org.apache.spark.sql.execution.streaming.Source
should give you enough information to get started (just follow the types to develop a compilable Scala type).
一旦有了Source
,就必须注册它,以便可以在DataStreamReader
的format
中使用它.使流媒体源可用以便可以将其用于format
的技巧是通过为流媒体源创建DataSourceRegister
进行注册.您可以在
Once you have the Source
you have to register it so you can use it in format
of a DataStreamReader
. The trick to make the streaming source available so you can use it for format
is to register it by creating the DataSourceRegister
for the streaming source. You can find examples in META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
这是将format
中的简称链接到实现的文件.
That's the file that links the short name in format
to the implementation.
我通常建议人们在Spark研讨会上做的是从双方开始进行开发:
What I usually recommend people doing during my Spark workshops is to start development from both sides:
-
例如使用
format
编写流式查询
val input = spark
.readStream
.format("yourCustomSource") // <-- your custom source here
.load
实现流式Source
和相应的DataSourceRegister
(可能是同一类)
Implement the streaming Source
and a corresponding DataSourceRegister
(it could be the same class)
(可选),通过将完全限定的类名(例如com.mycompany.spark.MyDataSourceRegister
)写入META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
来注册DataSourceRegister
:
(optional) Register the DataSourceRegister
by writing the fully-qualified class name, say com.mycompany.spark.MyDataSourceRegister
, to META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
:
$ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
com.mycompany.spark.MyDataSourceRegister
为自定义Source
注册DataSourceRegister
实现的最后一步是可选的,仅用于注册最终用户在
The last step where you register the DataSourceRegister
implementation for your custom Source
is optional and is only to register the data source alias that your end users use in DataFrameReader.format method.
格式(源:字符串):DataFrameReader 指定输入数据源格式.
format(source: String): DataFrameReader Specifies the input data source format.
查看 org.apache.spark.sql.execution.streaming.RateSourceProvider ,这是一个很好的起点.
Review the code of org.apache.spark.sql.execution.streaming.RateSourceProvider for a good head start.
这篇关于如何创建自定义流数据源?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!