如何在Apache Spark中将特征提取与DStream结合使用 [英] How to use feature extraction with DStream in Apache Spark

查看:110
本文介绍了如何在Apache Spark中将特征提取与DStream结合使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有从Kafka通过DStream到达的数据.我想执行特征提取以获得一些关键字.

I have data that arrive from Kafka through DStream. I want to perform feature extraction in order to obtain some keywords.

我不想等待所有数据的到来(因为它打算是可能永远不会结束的连续流),所以我希望以块的形式进行提取-准确性是否会受到影响对我来说并不重要一点.

I do not want to wait for arrival of all data (as it is intended to be continuous stream that potentially never ends), so I hope to perform extraction in chunks - it doesn't matter to me if the accuracy will suffer a bit.

到目前为止,我整理出了类似的内容:

So far I put together something like that:

def extractKeywords(stream: DStream[Data]): Unit = {

  val spark: SparkSession = SparkSession.builder.getOrCreate

  val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData

  val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _

  val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData

  streamWithFeatures.print()
}

def extractFeatures(spark: SparkSession)
                   (rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {

  val df = spark.createDataFrame(rdd).toDF("data", "words")

  val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
  val rawFeatures = hashingTF.transform(df)

  val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
  val idfModel = idf.fit(rawFeatures)

  val rescaledData = idfModel.transform(rawFeature)

  import spark.implicits._
  rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}

但是,我收到了java.lang.IllegalStateException: Haven't seen any document yet.-我并不感到惊讶,因为我只是尝试将所有东西拼凑在一起,并且我了解到由于我不等待某些数据的到来,因此当我尝试时生成的模型可能为空在数据上使用它.

However, I received java.lang.IllegalStateException: Haven't seen any document yet. - I am not surprised as I just try out to scrap things together, and I understand that since I am not waiting for an arrival of some data, the generated model might be empty when I try to use it on data.

解决此问题的正确方法是什么?

What would be the right approach for this problem?

推荐答案

我使用了来自注释的建议,并将该过程分为2次运行:

I used advises from comments and split the procedure into 2 runs:

  • 一个计算出IDF模型并将其保存到文件中的人

  • one that calculated IDF model and saves it to file

def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = {
  val session: SparkSession = SparkSession.builder.getOrCreate

  val wordsDf = session.createDataFrame(rdd).toDF("data", "words")

  val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
  val featurizedDf = hashingTF.transform(wordsDf)

  val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
  val idfModel = idf.fit(featurizedDf)

  idfModel.write.save(idfModelFile.getAbsolutePath)
}

  • 从文件中读取IDF模型并仅对所有传入信息运行它的人

  • one that reads IDF model from file and simply runs it on all incoming information

    val idfModel = IDFModel.load(idfModelFile.getAbsolutePath)
    
    val documentDf = spark.createDataFrame(rdd).toDF("update", "document")
    
    val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words")
    val wordsDf = tokenizer.transform(documentDf)
    
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
    val featurizedDf = hashingTF.transform(wordsDf)
    
    val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features")
    val featuresDf = extractor.transform(featurizedDf)
    
    featuresDf.select("update", "features")
    

  • 这篇关于如何在Apache Spark中将特征提取与DStream结合使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆