为什么foreachRDD不使用StreamingContext.textFileStream用新内容填充DataFrame? [英] Why does foreachRDD not populate DataFrame with new content using StreamingContext.textFileStream?

查看:359
本文介绍了为什么foreachRDD不使用StreamingContext.textFileStream用新内容填充DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是,当我将代码更改为流模式并将数据帧放入foreach循环时,数据帧显示为空表!我不填!我也不能将其放入assembler.transform()中.错误是:

My problem is that, as I change my code into streaming mode and put my data frame into the foreach loop, the data frame shows empty table! I does't fill! I also can not put it into assembler.transform(). The error is:

Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U].
Unspecified value parameter mapFunc.
      val dataFrame = Train_DStream.map()

我的train.csv文件如下所示: 请帮我. 这是我的代码:

My train.csv file is like below: Please help me. Here is my code:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Try

/**
  * Created by saeedtkh on 5/22/17.
  */
object ML_Test {
  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(10))
    val sqlContext = new SQLContext(sc)

    val customSchema = StructType(Array(
      StructField("column0", StringType, true),
      StructField("column1", StringType, true),
      StructField("column2", StringType, true)))

      //val Test_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv").map(LabeledPoint.parse)
      val Train_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv")
      val DStream =Train_DStream.map(line => line.split(">")).map(array => {
      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
      val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
      Row.fromSeq(Seq(first, second, third))
    })

      DStream.foreachRDD { Test_DStream =>
      val dataFrame = sqlContext.createDataFrame(Test_DStream, customSchema)
      dataFrame.groupBy("column1", "column2").count().show()

      val numFeatures = 3
      val model = new StreamingLinearRegressionWithSGD()
          .setInitialWeights(Vectors.zeros(numFeatures))

      val featureCol = Array("column1", "column2")
      val assembler=new VectorAssembler().setInputCols(featureCol).setOutputCol("features")
      dataFrame.show()
      val df_new=assembler.transform(dataFrame)

    }

    ssc.start()
    ssc.awaitTermination()
  }
}

推荐答案

我的猜测/Users/saeedtkh/Desktop/sharedsaeed/train.csv目录下的所有文件都已经处理过,因此没有剩余的文件,因此DataFrame为空.

My guess is that all the files under /Users/saeedtkh/Desktop/sharedsaeed/train.csv directory have already been processed and so there are no files left and hence the DataFrame is empty.

请注意,

Please note that the sole input parameter for StreamingContext.textFileStream is a directory not a file.

textFileStream(目录:字符串):DStream [String] 创建一个输入流,该流监视与Hadoop兼容的文件系统中的新文件,并将其读取为文本文件

textFileStream(directory: String): DStream[String] Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files

还请注意,一旦文件在Spark Streaming应用程序中进行过处理,则该文件应该进行更改(或附加),因为该文件已被标记为已处理且Spark Streaming将忽略任何修改.

Please also note that once a file has ever been processed in a Spark Streaming application, this file should not be changed (or appended to) since the file has already been marked as processed and Spark Streaming will ignore any modifications.

基本来源:

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的所有文件(不支持在嵌套目录中写入的文件). 请注意

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported). Note that

  • 文件必须具有相同的数据格式.

  • The files must have the same data format.

必须在原子目录中通过原子移动文件或将其重命名到数据目录中来创建文件.

The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

一旦移动,不得更改文件.因此,如果文件被连续追加,将不会读取新数据.

Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

对于简单的文本文件,有一种更简单的方法streamingContext.textFileStream(dataDirectory).文件流不需要运行接收器,因此不需要分配内核.

For simple text files, there is an easier method streamingContext.textFileStream(dataDirectory). And file streams do not require running a receiver, hence does not require allocating cores.


还请用setMaster("local[*]")替换setMaster("local"),以确保您的Spark Streaming应用程序将具有足够的线程来处理传入的数据(您必须至少具有2个线程).


Please also replace setMaster("local") with setMaster("local[*]") to make sure your Spark Streaming application will have enough threads to process incoming data (you have to have at least 2 threads).

这篇关于为什么foreachRDD不使用StreamingContext.textFileStream用新内容填充DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆