updateStateByKey()在星火流的使用,从原始事件流产生的状态变化的流 [英] Usage of updateStateByKey() in Spark Streaming to produce a stream of state changes from a stream of raw events

查看:303
本文介绍了updateStateByKey()在星火流的使用,从原始事件流产生的状态变化的流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始四处寻找有状态的计算与星火流媒体解决方案,当我遇到了updateStateByKey()函数来了。

I've just started looking around for a solution for stateful computation with Spark Streaming when I came across the updateStateByKey() function.

问题我试图解决:
万传感器产生的每分钟的二进制值。

The problem I'm trying to solve: 10,000 sensors produce a binary value every minute.

如果连续值传感器报告是彼此不同的,我想标志,并派下来卡夫卡作为一个国家改变事件。

If consecutive values a sensor reports are different from each other, I would like to flag that and send it down Kafka as a state change event.

我的假设是,updateStateByKey()在这个例子中可以使用,但是我并不完全认识到实施相同的建议的方法。

My assumption is that updateStateByKey() can be used in this example, however I'm not entirely aware of the recommended approach of implementing the same.

推荐答案

我假设你会得到(字符串,整数)从传感器对其中字符串是传感器的ID和流的诠释是传感器返回二进制值。有了这个假设,你可以尝试这样的事:

I am assuming that you will get a stream of (String, Int) pairs from the sensors where the String is the ID of the sensor and the Int is the binary value returned by the sensor. With that assumption you could try something like this:

val sensorData: DStream[(String, Int)] = ???

val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)

def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
    val newValuesMap = newValues.toMap
    val currentValuesMap = currentValues.toMap

    currentValuesMap.keys.foreach ( (id) =>
            if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
                //send to Kafka
            }
    )       
    Some(newValues)
}

这篇关于updateStateByKey()在星火流的使用,从原始事件流产生的状态变化的流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆