Spark结构化流MemoryStream +行+编码器问题 [英] Spark Structured Streaming MemoryStream + Row + Encoders issue

查看:101
本文介绍了Spark结构化流MemoryStream +行+编码器问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark结构化流在本地计算机上运行一些测试.

在批处理模式下,这是我要处理的行:

  val recordSchema = StructType(List(StructField("Record",MapType(StringType,StringType),false)))val行=列表(排(Map("ID"->"1","STRUCTUREID"->"MFCD00869853","MOLFILE"->"MOL数据","MOLWEIGHT"->"803.482","FORMULA"->"C44H69NO12","NAME"->他克莫司",哈希"->"52b966c551cfe0fa7d526bac16abcb7be8b8867d",微笑"->""[H] [C @] 12O [C @](O)([C @ H](C)C [C @@ H] 1OC)",新陈代谢"->《新陈代谢500》)),排(Map("ID"->"2","STRUCTUREID"->"MFCD00869854","MOLFILE"->"MOL数据","MOLWEIGHT"->"603.482","FORMULA"->","NAME"->他克莫司2",哈希"->"52b966c551cfe0fa7d526bac16abcb7be8b8867d",微笑"->""[H] [C @] 12O [C @](O)([C @ H](C)C [C @@ H] 1OC)",新陈代谢"->《新陈代谢500》)))val df = spark.createDataFrame(spark.sparkContext.parallelize(rows),recordSchema) 

在Batch中更多地使用它是一种魅力,没有问题.

现在,我尝试使用MemoryStream进行流式测试.我添加了以下内容:

 隐式val ctx = spark.sqlContextval intsInput = MemoryStream [Row] 

但是编译器抱怨如下:

未找到参数证据$ 1的隐式变量:Encoder [Row]

因此,我的问题:我应该怎么做才能使它正常工作

我还看到,如果我添加以下导入,错误就会消失:

导入spark.implicits ._

实际上,我现在收到以下警告而不是错误

参数证据$ 1的含糊不清的隐式:编码器[行]

我不太了解编码器机制,如果有人可以向我解释如何不使用这些隐式函数,我将不胜感激.原因是在从Rows创建DataFrame时,我在书中补充了以下内容.

推荐方式:

  val myManualSchema = new StructType(Array(新的StructField("some",StringType,true),新的StructField("col",StringType,true),new StructField("names",LongType,false)))val myRows = Seq(Row("Hello",null,1L))val myRDD = spark.sparkContext.parallelize(myRows)val myDf = spark.createDataFrame(myRDD,myManualSchema)myDf.show() 

然后作者继续:

在Scala中,我们还可以利用Spark在控制台(如果您将其导入到JAR代码中),请在序列类型.这不适用于null类型,因此不适合对于生产用例必不可少.

  val myDF = Seq(("Hello",2,1L)).toDF("col1","col2","col3") 

如果有人在我使用隐式函数时花时间解释我的情况发生了什么,并且这样做比较安全,或者有办法在不导入隐式函数的情况下更明确地做到这一点.

最后,如果有人能给我介绍有关Encoder和Spark Type映射的好文档,那将是很棒的.

EDIT1

我终于可以使用它了

 隐式val ctx = spark.sqlContext导入spark.implicits._val行= MemoryStream [Map [String,String]]val df = rows.toDF() 

尽管我的问题是我对自己的所作所为没有信心.在我看来,就像在某些情况下,我需要创建一个DataSet才能通过toDF转换将其转换为DF [ROW].我知道使用DS是typeSafe,但比使用DF慢.那么,为什么要与DataSet进行这种中介?这不是我第一次在Spark结构化流媒体中看到它.再说一次,如果有人可以帮助我,那将很棒.

解决方案

我建议您使用Scala的案例类进行数据建模.

 最终案例类Product(名称:字符串,目录号:字符串,cas:字符串,公式:字符串,重量:两倍,mld:字符串) 

现在,您可以在内存中包含 Product List :

  val inMemoryRecords:列表[产品] =列表(产物(环己烷羧酸","D19706","1148027-03-5","C(11)H(13)Cl(2)NO(5)",310.131,"MFCD11226417"),产品(他克莫司","G51159","104987-11-3","C(44)H(69)NO(12)",804.018,"MFCD00869853"),产物(甲醇","T57494","173310-45-7","C(8)H(8)Cl(2)O",191.055,"MFCD27756662")) 

通过结构化流API 可以轻松实现通过使用众所周知的 Dataset [T] 抽象来进行流处理的原因.粗略地说,您只需要担心三件事:

  • 来源:a源可以生成输入数据流,我们可以将其表示为 Dataset [Input] .到达的每个新数据项 Input 都将附加到此无边界数据集中.您可以根据需要操纵数据(例如 Dataset [Input] => Dataset [Output] ).
  • StreamingQueries 接收器:查询生成每个触发间隔都会从Source更新的结果表.更改被写入到称为接收器的外部存储中.
  • 输出模式:您可以通过多种模式将数据写入接收器:完整模式,附加模式和更新模式.

让我们假设您想知道分子量大于200个单位的产品.

正如您所说,使用批处理API相当简单明了:

 //使用内存中的数据创建静态数据集val staticData:Dataset [产品] = spark.createDataset(inMemoryRecords)//加工...val结果:Dataset [产品] = staticData.filter(_.weight> 200)//打印结果!result.show() 

使用Streaming API时,您只需定义一个 source 和一个 sink 作为额外的步骤.在此示例中,我们可以使用 MemoryStream console 接收器打印出结果.

 //使用内存中数据创建流数据集(内存源)val productSource = MemoryStream [产品]productSource.addData(inMemoryRecords)val StreamingData:数据集[Product] = productSource.toDS()//加工...val结果:Dataset [产品] = StreamingData.filter(_.weight> 200)//使用控制台接收器打印结果.val查询:StreamingQuery = result.writeStream.format("console").start()//停止流式传输query.awaitTermination(timeoutMs = 5000)query.stop() 

请注意, staticData streamingData 具有确切的类型签名(即 Dataset [Product] ).这样,无论使用Batch还是Streaming API,我们都可以应用相同的处理步骤.您也可以考虑实现一种通用方法 def processing [In,Out](inputData:Dataset [In]):Dataset [Out] = ??? ,以避免在这两种方法中重复您自己的内容.

完整的代码示例:

 对象ExMemoryStream扩展了App {//样板代码...val spark:SparkSession = SparkSession.builder.appName("ExMemoryStreaming").master("local [*]").getOrCreate()spark.sparkContext.setLogLevel("ERROR")导入spark.implicits._隐式val sqlContext:SQLContext = spark.sqlContext//定义数据模型最终案例类Product(名称:字符串,目录号:字符串,cas:字符串,公式:字符串,重量:两倍,mld:字符串)//创建一些内存中的实例val inMemoryRecords:列表[产品] =列表(产物(环己烷羧酸","D19706","1148027-03-5","C(11)H(13)Cl(2)NO(5)",310.131,"MFCD11226417"),产品(他克莫司","G51159","104987-11-3","C(44)H(69)NO(12)",804.018,"MFCD00869853"),产物(甲醇","T57494","173310-45-7","C(8)H(8)Cl(2)O",191.055,"MFCD27756662"))//定义处理步骤def处理(inputData:数据集[产品]):数据集[产品] =inputData.filter(_.weight> 200)//静态数据集val数据集静态:Dataset [Product] = spark.createDataset(inMemoryRecords)println(这是静态数据集:")processing(datasetStatic).show()//流数据集val productSource = MemoryStream [产品]productSource.addData(inMemoryRecords)val datasetStreaming:数据集[产品] = productSource.toDS()println(这是流数据集:")val查询:StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()query.awaitTermination(timeoutMs = 5000)//停止查询并关闭Sparkquery.stop()spark.close()} 

I am trying to run some tests on my local machine with spark structured streaming.

In batch mode here is the Row that i am dealing with:

val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
val rows         = List(
    Row(
      Map("ID" -> "1",
        "STRUCTUREID" -> "MFCD00869853",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "803.482",
        "FORMULA" -> "C44H69NO12",
        "NAME" -> "Tacrolimus",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
       )),
    Row(
      Map("ID" -> "2",
        "STRUCTUREID" -> "MFCD00869854",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "603.482",
        "FORMULA" -> "",
        "NAME" -> "Tacrolimus2",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
      ))
  )
val df  = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)

Working with that in Batch more works as a charm, no issue.

Now I'm try to move in streaming mode using MemoryStream for testing. I added the following:

implicit val ctx = spark.sqlContext
val intsInput = MemoryStream[Row]

But the compiler complain with the as follows:

No implicits found for parameter evidence$1: Encoder[Row]

Hence, my question: What should I do here to get that working

Also i saw that if I add the following import the error goes away:

import spark.implicits._

Actually, I now get the following warning instead of an error

Ambiguous implicits for parameter evidence$1: Encoder[Row]

I do not understand the encoder mechanism well and would appreciate if someone could explain to me how not to use those implicits. The reason being that I red the following in a book when it comes to the creation of DataFrame from Rows.

Recommended appraoch:

val myManualSchema = new StructType(Array(
  new StructField("some", StringType, true),
  new StructField("col", StringType, true),
  new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()

And then the author goes on with this:

In Scala, we can also take advantage of Spark’s implicits in the console (and if you import them in your JAR code) by running toDF on a Seq type. This does not play well with null types, so it’s not necessarily recommended for production use cases.

val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

If someone could take the time to explain what is happening in my scenario when i use the implicit, and if it is rather safe to do so, or else is there a way to do it more explicitly without importing the implicit.

Finally, if someone could point me to a good doc around Encoder and Spark Type mapping that would be great.

EDIT1

I finally got it to work with

  implicit val ctx = spark.sqlContext
  import spark.implicits._
  val rows = MemoryStream[Map[String,String]]
  val df = rows.toDF()

Although my problem here is that i am not confident about what I am doing. It seems to me that it is like in some situation I need to create a DataSet to be able to convert it in an DF[ROW] with toDF conversion. I understood that working with DS is typeSafe but slower than with DF. So why this intermediary with DataSet? This is not the first time that i see that in Spark Structured Streaming. Again if someone could help me with those, that would be great.

解决方案

I encourage you to use Scala's case classes for data modeling.

final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

Now you can have a List of Product in memory:

  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

The structured streaming API makes it easy to reason about stream processing by using the widely known Dataset[T] abstraction. Roughly speaking, you just have to worry about three things:

  • Source: a source can generate an input data stream which we can represent as a Dataset[Input]. Every new data item Input that arrives is going to be appended into this unbounded dataset. You can manipulate the data as you wish (e.g. Dataset[Input] => Dataset[Output]).
  • StreamingQueries and Sink: a query generates a result table that's updated from the Source every trigger interval. Changes are written into external storage called a Sink.
  • Output modes: there are different modes on which you can write data into the Sink: complete mode, append mode, and update mode.

Let's assume that you want to know the products that contain a molecular weight bigger than 200 units.

As you said, using the batch API is fairly simple and straight-forward:

// Create an static dataset using the in-memory data
val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)

// Processing...
val result: Dataset[Product] = staticData.filter(_.weight > 200)

// Print results!
result.show()

When using the Streaming API you just need to define a source and a sink as an extra step. In this example, we can use the MemoryStream and the console sink to print out the results.

// Create an streaming dataset using the in-memory data (memory source)
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)

val streamingData: Dataset[Product] = productSource.toDS()

// Processing...
val result: Dataset[Product] = streamingData.filter(_.weight > 200)

// Print results by using the console sink. 
val query: StreamingQuery = result.writeStream.format("console").start()

// Stop streaming
query.awaitTermination(timeoutMs=5000)
query.stop()

Note that the staticData and the streamingData have the exact type signature (i.e., Dataset[Product]). This allows us to apply the same processing steps regardless of using the Batch or Streaming API. You can also think of implementing a generic method def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ??? to avoid repeating yourself in both approaches.

Complete code example:

object ExMemoryStream extends App {

  // Boilerplate code...
  val spark: SparkSession = SparkSession.builder
    .appName("ExMemoryStreaming")
    .master("local[*]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._
  implicit val sqlContext: SQLContext = spark.sqlContext

  // Define your data models 
  final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

  // Create some in-memory instances
  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

  // Defining processing step
  def processing(inputData: Dataset[Product]): Dataset[Product] =
    inputData.filter(_.weight > 200)

  // STATIC DATASET
  val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)

  println("This is the static dataset:")
  processing(datasetStatic).show()

  // STREAMING DATASET
  val productSource = MemoryStream[Product]
  productSource.addData(inMemoryRecords)

  val datasetStreaming: Dataset[Product] = productSource.toDS()

  println("This is the streaming dataset:")
  val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
  query.awaitTermination(timeoutMs=5000)
  
  // Stop query and close Spark
  query.stop()
  spark.close()

}

这篇关于Spark结构化流MemoryStream +行+编码器问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆