为什么 foreachRDD 不使用 StreamingContext.textFileStream 用新内容填充 DataFrame? [英] Why does foreachRDD not populate DataFrame with new content using StreamingContext.textFileStream?

查看:20
本文介绍了为什么 foreachRDD 不使用 StreamingContext.textFileStream 用新内容填充 DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是,当我将代码更改为流模式并将数据框放入 foreach 循环时,数据框显示为空表!我不填!我也不能把它放到 assembler.transform() 中.错误是:

My problem is that, as I change my code into streaming mode and put my data frame into the foreach loop, the data frame shows empty table! I does't fill! I also can not put it into assembler.transform(). The error is:

Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U].
Unspecified value parameter mapFunc.
      val dataFrame = Train_DStream.map()

我的 train.csv 文件如下所示:请帮我.这是我的代码:

My train.csv file is like below: Please help me. Here is my code:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Try

/**
  * Created by saeedtkh on 5/22/17.
  */
object ML_Test {
  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(10))
    val sqlContext = new SQLContext(sc)

    val customSchema = StructType(Array(
      StructField("column0", StringType, true),
      StructField("column1", StringType, true),
      StructField("column2", StringType, true)))

      //val Test_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv").map(LabeledPoint.parse)
      val Train_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv")
      val DStream =Train_DStream.map(line => line.split(">")).map(array => {
      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
      val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
      Row.fromSeq(Seq(first, second, third))
    })

      DStream.foreachRDD { Test_DStream =>
      val dataFrame = sqlContext.createDataFrame(Test_DStream, customSchema)
      dataFrame.groupBy("column1", "column2").count().show()

      val numFeatures = 3
      val model = new StreamingLinearRegressionWithSGD()
          .setInitialWeights(Vectors.zeros(numFeatures))

      val featureCol = Array("column1", "column2")
      val assembler=new VectorAssembler().setInputCols(featureCol).setOutputCol("features")
      dataFrame.show()
      val df_new=assembler.transform(dataFrame)

    }

    ssc.start()
    ssc.awaitTermination()
  }
}

推荐答案

我的猜测/Users/saeedtkh/Desktop/sharedsaeed/train.csv 目录已被处理,因此没有文件剩余,因此 DataFrame 为空.

My guess is that all the files under /Users/saeedtkh/Desktop/sharedsaeed/train.csv directory have already been processed and so there are no files left and hence the DataFrame is empty.

请注意StreamingContext.textFileStream 是目录而不是文件.

Please note that the sole input parameter for StreamingContext.textFileStream is a directory not a file.

textFileStream(directory: String): DStream[String] 创建一个输入流,监控与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取

textFileStream(directory: String): DStream[String] Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files

另请注意,一旦文件在 Spark Streaming 应用程序中被处理过,该文件不应被更改(或附加到),因为该文件已被标记为已处理且 Spark Streaming将忽略任何修改.

Please also note that once a file has ever been processed in a Spark Streaming application, this file should not be changed (or appended to) since the file has already been marked as processed and Spark Streaming will ignore any modifications.

Basic中引用Spark Streaming的官方文档来源:

Spark Streaming 将监视目录 dataDirectory 并处理在该目录中创建的任何文件(不支持在嵌套目录中写入的文件).请注意

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported). Note that

  • 文件必须具有相同的数据格式.

  • The files must have the same data format.

必须通过将文件原子移动或重命名到数据目录中的方式在 dataDirectory 中创建文件.

The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

文件一旦移动,不得更改.因此,如果文件不断追加,则不会读取新数据.

Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

对于简单的文本文件,有一个更简单的方法streamingContext.textFileStream(dataDirectory).并且文件流不需要运行接收器,因此不需要分配内核.

For simple text files, there is an easier method streamingContext.textFileStream(dataDirectory). And file streams do not require running a receiver, hence does not require allocating cores.

<小时>

同时请将 setMaster("local") 替换为 setMaster("local[*]") 以确保您的 Spark Streaming 应用程序将有足够的线程来处理传入的数据(您必须至少有 2 个线程).


Please also replace setMaster("local") with setMaster("local[*]") to make sure your Spark Streaming application will have enough threads to process incoming data (you have to have at least 2 threads).

这篇关于为什么 foreachRDD 不使用 StreamingContext.textFileStream 用新内容填充 DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆