火花流update_state_by_keys数组聚集 [英] spark streaming update_state_by_keys for arrays aggregation

查看:224
本文介绍了火花流update_state_by_keys数组聚集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我输入线像下面

t1时,文件1,1,1,1

t1, file1, 1, 1, 1

t1时,文件1,1,2,3

t1, file1, 1, 2, 3

T1,文件2,2,2,2,2

t1, file2, 2, 2, 2, 2

t2时,文件1,5,5,5

t2, file1, 5, 5, 5

t2时,文件2,1,1,2,2

t2, file2, 1, 1, 2, 2

和像下面的输出行这是一个垂直的另外的相应数字。

and the output like below rows which is a vertical addition of the corresponding numbers.

文件1:[1+,1 + 2 + 5,1 + 3 + 5]

file1 : [ 1+, 1+2+5, 1+3+5 ]

文件2:[2 + 1,2 + 1,2 + 2,2 + 2]

file2 : [ 2+1, 2+1, 2+2, 2+2 ]

目前的数据汇总逻辑正在批量区间,但它不维护状态。所以,我加入update_state_by_key功能,并通过以下功能,就是做这个正确的方式?

Currently data aggregation logic is working for batch interval, but it's not maintaining state. So, i am adding update_state_by_key function and passing below function, Is this right way to do?

我目前的计划:

    def updateValues( newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {

        val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
        val allValues = newValues +: previousCount
        Some(allValues.toList.transpose.map(_.sum).toArray)

      }

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    // create a StreamingContext, the main entry point for all streaming functionality
    val ssc = new StreamingContext(sc, Seconds(2))
    // parse the lines of data into coverage objects
    val inputStream = ssc.socketTextStream(<hostname>, 9999)
    ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
    inputStream.print(10)
    val parsedDstream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
      })
    val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)

        // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }

有关参考,我的previous程序(不包括状态转变):

For reference, my previous program (without stateful transformation):

def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("HBaseStream")
        val sc = new SparkContext(conf)
        // create a StreamingContext, the main entry point for all streaming functionality
        val ssc = new StreamingContext(sc, Seconds(2))
        val inputStream = ssc.socketTextStream("hostname", 9999)
        val parsedDstream = inputStream
          .map(line => {
            val splitLines = line.split(",")
            (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
          })
          .reduceByKey((first, second) => {
            val listOfArrays = ArrayBuffer(first, second)
            listOfArrays.toList.transpose.map(_.sum).toArray
          })
          .foreachRDD(rdd => rdd.foreach(Blaher.blah))
    }

先谢谢了。

推荐答案

什么你要找的是 updateStateByKey 。对于 DSTREAM [(T,U)] 应该采取函数两个参数:

What you're looking for is updateStateByKey. For DStream[(T, U)] it should take a function with two arguments:


  • 序列[U] - 重新为当前窗口presenting状态

  • 选项[U] - 再presenting的堆积状态

  • Seq[U] - representing state for current window
  • Option[U] - representing accumulated state

和返回选项[U]

鉴于你的code将其例如可以实现这样的:

Given your code it could be implemented for example like this:

import breeze.linalg.{DenseVector => BDV}
import scala.util.Try

val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
  (current: Seq[Array[Int]], prev: Option[Array[Int]]) =>  {
    prev.map(_ +: current).orElse(Some(current))
    .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

要能够使用它,你就必须配置检查点

To be able to use it you'll have to configure checkpointing.

这篇关于火花流update_state_by_keys数组聚集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆