在 Spark Stream 中创建 DataFrame [英] Create a DataFrame in Spark Stream

查看:34
本文介绍了在 Spark Stream 中创建 DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已将 Kafka Stream 连接到 Spark.以及我已经训练了 Apache Spark Mlib 模型以基于流文本进行预测.我的问题是,得到一个我需要通过 DataFramework 的预测.

I've connected the Kafka Stream to the Spark. As well as I've trained Apache Spark Mlib model to prediction based on a streamed text. My problem is, get a prediction I need to pass a DataFramework.

//kafka stream    
val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
//load mlib model
val model = PipelineModel.load(modelPath)
 stream.foreachRDD { rdd =>

      rdd.foreach { record =>
       //to get a prediction need to pass DF
       val toPredict = spark.createDataFrame(Seq(
          (1L, record.value())
        )).toDF("id", "review")
        val prediction = model.transform(test)
      }
}

我的问题是,Spark 流不允许创建 DataFrame.有没有办法做到这一点?我可以使用案例类或结构吗?

My problem is, Spark streaming doesn't allow to create a DataFrame. Is there any way to do that? Can I use case class or struct?

推荐答案

可以像在核心 Spark 中一样从 RDD 创建 DataFrameDataset .为此,我们需要应用模式.在 foreachRDD 中,我们可以将生成的 RDD 转换为可以进一步与 ML 管道一起使用的 DataFrame.

It's possible to create a DataFrame or Dataset from an RDD as you would in core Spark. To do that, we need to apply a schema. Within the foreachRDD we can then transform the resulting RDD into a DataFrame that can be further used with an ML pipeline.

// we use a schema in the form of a case class
case class MyStructure(field:type, ....)
// and we implement our custom transformation from string to our structure
object MyStructure {
    def parse(str: String) : Option[MyStructure] = ...
}

val stream = KafkaUtils.createDirectStream... 
// give the stream a schema using a case class
val strucStream =  stream.flatMap(cr => MyStructure.parse(cr.value))

strucStream.foreachRDD { rdd =>
    import sparkSession.implicits._
    val df = rdd.toDF()
    val prediction = model.transform(df)
    // do something with df
}

这篇关于在 Spark Stream 中创建 DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆