星火流累计字数 [英] Spark Streaming Accumulated Word Count

查看:118
本文介绍了星火流累计字数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是用Scala编写的火花流媒体节目。它计算的字的数量,从在每1秒的插座。其结果将是字数,例如,字从时间0数到1,字从时间1再算上2。但我不知道是否有某种方式,我们可以改变这个计划,使我们可以得到积累字数?也就是说,字数从时间0到现在。

  VAL sparkConf =新SparkConf()。setAppName(NetworkWordCount)
VAL SSC =新的StreamingContext(sparkConf,秒(1))//目标IP创建一个Socket流:端口计数
在\\输入流//话ñ分隔文本(例如,通过产生的NC)
//注意,在存储级别没有重复只有在本地运行。
//必要在容错分布式场景复制。
VAL线= ssc.socketTextStream(参数(0),ARGS(1)​​.toInt,StorageLevel.MEMORY_AND_DISK_SER)
VAL字= lines.flatMap(_。分裂())
VAL wordCounts = words.map(X =>(X,1))。reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()


解决方案

您可以使用<一个href=\"https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala\"><$c$c>StateDStream为了这。有一个<一个href=\"https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala\">example状态字从火花范例计数。

 对象StatefulNetworkWordCount {
  高清主(参数:数组[字符串]){
    如果(args.length 2){
      通信System.err.println(用法:StatefulNetworkWordCount&LT;&主机GT;&LT;港口&gt;中)
      System.exit(1)
    }    StreamingExamples.setStreamingLogLevels()    VAL updateFunc =(值:序号[INT],状态:选项[INT])=&GT; {
      VAL CURRENTCOUNT = values​​.foldLeft(0)(+ _ _)      VAL previousCount = state.getOrElse(0)      有些(CURRENTCOUNT + previousCount)
    }    VAL sparkConf =新SparkConf()。setAppName(StatefulNetworkWordCount)
    //创建以1第二批大小的情况下
    VAL SSC =新的StreamingContext(sparkConf,秒(1))
    ssc.checkpoint(。)    //目标IP创建NetworkInputDStream:端口计数
    在\\的输入流//词语Ñ分隔测试(例如,通过产生'NC')
    VAL线= ssc.socketTextStream(参数(0),ARGS(1)​​.toInt)
    VAL字= lines.flatMap(_。分裂())
    VAL wordDstream = words.map(X =&GT;(X,1))    //更新使用updateStateByKey累计次数
    //这将给做状态的DSTREAM(这是一家之言累计计数)
    VAL stateDstream = wordDstream.updateStateByKey [INT](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

它的工作方式是,你得到一个序列[T] 对于每个批次,那么你更新选项[T] 这就像一个累加器。它之所以是一个选项是因为第一批将​​是并保持下去,除非它的更新。在这个例子中的计数是int,如果你正在处理大量的数据,您可能希望甚至有一个 BigInt有

This is a spark streaming program written in scala. It counts the number of words from a socket in every 1 second. The result would be the word count, for example, the word count from time 0 to 1, and the word count then from time 1 to 2. But I wonder if there is some way we could alter this program so that we could get accumulated word count? That is, the word count from time 0 up till now.

val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

解决方案

You can use a StateDStream for this. There is an example of stateful word count from sparks examples.

object StatefulNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)
    }

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")

    // Create a NetworkInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using updateStateByKey
    // This will give a Dstream made of state (which is the cumulative count of the words)
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

The way it works is you get an Seq[T] for each batch, then you update an Option[T] which acts like an accumulator. The reason it is an Option is because on the first batch it will be None and stay that way unless it's updated. In this example the count is an int, if you are dealing with a lot of data you may want to even have a Long or BigInt

这篇关于星火流累计字数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆