火花流update_state_by_keys数组聚集 [英] spark streaming update_state_by_keys for arrays aggregation
问题描述
我输入线像下面
t1时,文件1,1,1,1
t1, file1, 1, 1, 1
t1时,文件1,1,2,3
t1, file1, 1, 2, 3
T1,文件2,2,2,2,2
t1, file2, 2, 2, 2, 2
t2时,文件1,5,5,5
t2, file1, 5, 5, 5
t2时,文件2,1,1,2,2
t2, file2, 1, 1, 2, 2
和像下面的输出行这是一个垂直的另外的相应数字。
and the output like below rows which is a vertical addition of the corresponding numbers.
文件1:[1+,1 + 2 + 5,1 + 3 + 5]
file1 : [ 1+, 1+2+5, 1+3+5 ]
文件2:[2 + 1,2 + 1,2 + 2,2 + 2]
file2 : [ 2+1, 2+1, 2+2, 2+2 ]
目前的数据汇总逻辑正在批量区间,但它不维护状态。所以,我加入update_state_by_key功能,并通过以下功能,就是做这个正确的方式?
Currently data aggregation logic is working for batch interval, but it's not maintaining state. So, i am adding update_state_by_key function and passing below function, Is this right way to do?
我目前的计划:
def updateValues( newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {
val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
val allValues = newValues +: previousCount
Some(allValues.toList.transpose.map(_.sum).toArray)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
// parse the lines of data into coverage objects
val inputStream = ssc.socketTextStream(<hostname>, 9999)
ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
inputStream.print(10)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
有关参考,我的previous程序(不包括状态转变):
For reference, my previous program (without stateful transformation):
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", 9999)
val parsedDstream = inputStream
.map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
})
.reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
})
.foreachRDD(rdd => rdd.foreach(Blaher.blah))
}
先谢谢了。
推荐答案
什么你要找的是 updateStateByKey
。对于 DSTREAM [(T,U)]
应该采取函数两个参数:
What you're looking for is updateStateByKey
. For DStream[(T, U)]
it should take a function with two arguments:
-
序列[U]
- 重新为当前窗口presenting状态 -
选项[U]
- 再presenting的堆积状态
Seq[U]
- representing state for current windowOption[U]
- representing accumulated state
和返回选项[U]
。
鉴于你的code将其例如可以实现这样的:
Given your code it could be implemented for example like this:
import breeze.linalg.{DenseVector => BDV}
import scala.util.Try
val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
(current: Seq[Array[Int]], prev: Option[Array[Int]]) => {
prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})
要能够使用它,你就必须配置检查点。
To be able to use it you'll have to configure checkpointing.
这篇关于火花流update_state_by_keys数组聚集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!