星火流累计字数 [英] Spark Streaming Accumulated Word Count
问题描述
这是用Scala编写的火花流媒体节目。它计算的字的数量,从在每1秒的插座。其结果将是字数,例如,字从时间0数到1,字从时间1再算上2。但我不知道是否有某种方式,我们可以改变这个计划,使我们可以得到积累字数?也就是说,字数从时间0到现在。
VAL sparkConf =新SparkConf()。setAppName(NetworkWordCount)
VAL SSC =新的StreamingContext(sparkConf,秒(1))//目标IP创建一个Socket流:端口计数
在\\输入流//话ñ分隔文本(例如,通过产生的NC)
//注意,在存储级别没有重复只有在本地运行。
//必要在容错分布式场景复制。
VAL线= ssc.socketTextStream(参数(0),ARGS(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
VAL字= lines.flatMap(_。分裂())
VAL wordCounts = words.map(X =>(X,1))。reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
您可以使用<一个href=\"https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala\"><$c$c>StateDStream$c$c>为了这。有一个<一个href=\"https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala\">example状态字从火花范例计数。
对象StatefulNetworkWordCount {
高清主(参数:数组[字符串]){
如果(args.length 2){
通信System.err.println(用法:StatefulNetworkWordCount&LT;&主机GT;&LT;港口&gt;中)
System.exit(1)
} StreamingExamples.setStreamingLogLevels() VAL updateFunc =(值:序号[INT],状态:选项[INT])=&GT; {
VAL CURRENTCOUNT = values.foldLeft(0)(+ _ _) VAL previousCount = state.getOrElse(0) 有些(CURRENTCOUNT + previousCount)
} VAL sparkConf =新SparkConf()。setAppName(StatefulNetworkWordCount)
//创建以1第二批大小的情况下
VAL SSC =新的StreamingContext(sparkConf,秒(1))
ssc.checkpoint(。) //目标IP创建NetworkInputDStream:端口计数
在\\的输入流//词语Ñ分隔测试(例如,通过产生'NC')
VAL线= ssc.socketTextStream(参数(0),ARGS(1).toInt)
VAL字= lines.flatMap(_。分裂())
VAL wordDstream = words.map(X =&GT;(X,1)) //更新使用updateStateByKey累计次数
//这将给做状态的DSTREAM(这是一家之言累计计数)
VAL stateDstream = wordDstream.updateStateByKey [INT](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
它的工作方式是,你得到一个序列[T]
对于每个批次,那么你更新选项[T]
这就像一个累加器。它之所以是一个选项
是因为第一批将是无
并保持下去,除非它的更新。在这个例子中的计数是int,如果你正在处理大量的数据,您可能希望甚至有一个龙
或 BigInt有
This is a spark streaming program written in scala. It counts the number of words from a socket in every 1 second. The result would be the word count, for example, the word count from time 0 to 1, and the word count then from time 1 to 2. But I wonder if there is some way we could alter this program so that we could get accumulated word count? That is, the word count from time 0 up till now.
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
You can use a StateDStream
for this. There is an example of stateful word count from sparks examples.
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
The way it works is you get an Seq[T]
for each batch, then you update an Option[T]
which acts like an accumulator. The reason it is an Option
is because on the first batch it will be None
and stay that way unless it's updated. In this example the count is an int, if you are dealing with a lot of data you may want to even have a Long
or BigInt
这篇关于星火流累计字数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!