如何创建自定义流数据源? [英] How to create a custom streaming data source?

查看:174
本文介绍了如何创建自定义流数据源?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用于Spark Streaming的自定义读取器,该读取器从WebSocket读取数据.我将尝试Spark结构化流.

I have a custom reader for Spark Streaming that reads data from WebSocket. I'm going to try Spark Structured Streaming.

如何在Spark结构化流中创建流数据源?

How to create a streaming data source in Spark Structured Streaming?

推荐答案

流数据源实现 org.apache.spark.sql.execution.streaming.Source的scaladoc应该为您提供足够的入门信息(只需按照类型来开发可编译的Scala类型).

The scaladoc of org.apache.spark.sql.execution.streaming.Source should give you enough information to get started (just follow the types to develop a compilable Scala type).

一旦有了Source,就必须注册它,以便可以在DataStreamReaderformat中使用它.使流媒体源可用以便可以将其用于format的技巧是通过为流媒体源创建DataSourceRegister进行注册.您可以在

Once you have the Source you have to register it so you can use it in format of a DataStreamReader. The trick to make the streaming source available so you can use it for format is to register it by creating the DataSourceRegister for the streaming source. You can find examples in META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider

这是将format中的简称链接到实现的文件.

That's the file that links the short name in format to the implementation.

我通常建议人们在Spark研讨会上做的是从双方开始进行开发:

What I usually recommend people doing during my Spark workshops is to start development from both sides:

  1. 例如使用format编写流式查询

val input = spark
  .readStream
  .format("yourCustomSource") // <-- your custom source here
  .load

  • 实现流式Source和相应的DataSourceRegister(可能是同一类)

  • Implement the streaming Source and a corresponding DataSourceRegister (it could be the same class)

    (可选),通过将完全限定的类名(例如com.mycompany.spark.MyDataSourceRegister)写入META-INF/services/org.apache.spark.sql.sources.DataSourceRegister来注册DataSourceRegister:

    (optional) Register the DataSourceRegister by writing the fully-qualified class name, say com.mycompany.spark.MyDataSourceRegister, to META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

    $ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
    com.mycompany.spark.MyDataSourceRegister
    

  • 为自定义Source注册DataSourceRegister实现的最后一步是可选的,仅用于注册最终用户在

    The last step where you register the DataSourceRegister implementation for your custom Source is optional and is only to register the data source alias that your end users use in DataFrameReader.format method.

    格式(源:字符串):DataFrameReader 指定输入数据源格式.

    format(source: String): DataFrameReader Specifies the input data source format.

    查看 org.apache.spark.sql.execution.streaming.RateSourceProvider ,这是一个很好的起点.

    Review the code of org.apache.spark.sql.execution.streaming.RateSourceProvider for a good head start.

    这篇关于如何创建自定义流数据源?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆