如何对无限流中的传入事件进行分组? [英] How to group incoming events from infinite stream?

查看:125
本文介绍了如何对无限流中的传入事件进行分组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有无限的事件流:

 (时间戳,session_uid,流量)

  ... 
(1448089943,session-1,10)
(1448089944,session-1,20)
(1448089945,session-2,50)
(1448089946,session-1,30 )
(1448089947,第2届,10)
(1448089948,第3届,10)
...

这些事件我想按session_uid分组并计算每个会话的流量总和。



我写了 akka-streams 与有限流配合使用时效果很好,请使用 groupBy (我的代码基于示例)。但是对于无限流,它将不起作用,因为 groupBy 函数应处理所有传入流,并且只有在此之后才准备返回结果。



我认为我应该实现带超时的分组,即,如果我没有从最后一次超过5分钟接收到具有指定stream_uid的事件,那么我应该为此session_uid返回分组事件。但是如何实现它仅使用 akka-streams

解决方案

I提出了一个生硬解决方案,但我认为完成工作。



基本思想是使用Source的 keepAlive 方法作为触发完成的计时器。



但是要做到这一点,我们首先必须对数据进行抽象。计时器将需要从原始源发送触发器或另一个元组值,因此:

 密封特征数据

对象TimerTrigger扩展了数据
案例类Value(tstamp:Long,session_uid:String,traffic:Int)扩展了数据

然后将我们的元组来源转换为价值来源。我们仍将使用 groupBy 进行类似于有限流情况的分组:

  val originalSource:Source [(Long,String,Int),Unit] = ??? 

类型IDGroup =(String,Source [Value,Unit])// uid-> uid

val的值源val groupedDataSource:Source [IDGroup,Unit] =
originalSource.map(t => Value(t._1,t._2,t._3))
.groupBy(_。session_uid)

棘手的部分是处理只是元组:(字符串,源[值,单位])。我们需要计时器来通知我们时间是否已过去,因此我们需要另一个抽象来知道我们是否仍在计算或由于超时而完成了计算:

 密封特征Sum {
val sum:Int
}
case class StillComputing(val sum:Int)扩展总和
case class ComputedSum(val sum:Int)扩展总和

val零和:Sum = StillComputing(0)



<现在我们可以耗尽每个组的来源。如果在之后值的来源没有产生任何结果,则 keepAlive 将发送 TimerTrigger 超时。然后将keepAlive中的 Data 与TimerTrigger或原始源中的新值进行模式匹配:

  val validateSum:(((Sum,Data))==> Sum = {
case(runningSum,data)=> {
数据匹配{
case TimerTrigger => ComputedSum(runningSum.sum)
case v:值=> StillComputing(runningSum.sum + v.traffic)
}
}
} //结束值validateSum

类型SumResult =(String,Future [Int])/ / uid-> uid

def handleGroup(timeOut:FiniteDuration)(idGroup:IDGroup)的流量总和的未来:SumResult =
idGroup._1-> idGroup._2.keepAlive(timeOut,()=> TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {情况c:ComputedSum => c.sum}
.runWith(Sink.head)

该集合适用于



然后将这个处理程序应用于每个出现的分组:



p>

  val timeOut = FiniteDuration(5,MINUTES)

val sumSource:Source [SumResult,Unit] =
groupedDataSource地图handleGroup(timeOut)

我们现在有一个的源(String,Future [Int]),它是session_uid和该ID流量总和的Future。



就像我说的那样,令人费解,但符合要求。另外,我不完全确定如果一个已经被分组并且已经超时的uid会发生什么,但是一个具有相同uid的新值出现了。


I have an infinite stream of events:

(timestamp, session_uid, traffic)

i.e.

...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...

These events I want to group by session_uid and calculate sum of traffic for each session.

I wrote an akka-streams flow which works fine with finite stream use groupBy (my code base on this example from cookbook). But with infinite stream it will not works because groupBy function should process all incoming stream and only after that will be ready to return result.

I think I should implement grouping with timeout i.e. if I don't receive event with specified stream_uid more than 5 minutes from last I should return grouped events for this session_uid. But how to implement it use akka-streams only?

解决方案

I came up with a somewhat gnarly solution but I think it gets the job done.

The essential idea is to use the keepAlive method of Source as the timer that will trigger completion.

But to do this we first have to abstract the data a bit. The timer will need to send the trigger or another tuple value from the original Source, therefore:

sealed trait Data

object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data

Then convert our Source of tuples to a Source of Values. We'll still use groupBy to do groupings similar to your finite stream case:

val originalSource : Source[(Long, String, Int), Unit] = ???

type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid

val groupedDataSource : Source[IDGroup, Unit] = 
  originalSource.map(t => Value(t._1, t._2, t._3))
                .groupBy(_.session_uid)

The tricky part is handling the groupings which are just tuples: (String, Source[Value,Unit]). We need the timer to notify us if time has elapsed so we need another abstraction to know if we're still computing or we've completed computation due to a timeout:

sealed trait Sum {
  val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum

val zeroSum : Sum = StillComputing(0)

Now we can drain the Source of each group. The keepAlive will send a TimerTrigger if the Source of Values doesn't produce something after the timeOut. The Data from the keepAlive is then pattern matched against either a TimerTrigger or a new Value from the original Source:

val evaluateSum : ((Sum , Data)) => Sum = {
  case (runningSum, data) => { 
    data match {
      case TimerTrigger => ComputedSum(runningSum.sum)
      case v : Value    => StillComputing(runningSum.sum + v.traffic)
    }
  }
}//end val evaluateSum

type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid

def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult = 
  idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
                          .scan(zeroSum)(evaluateSum)
                          .collect {case c : ComputedSum => c.sum}
                          .runWith(Sink.head)

The collection is applied to a partial function that only matches a finished sum, therefore the Sink is only reached after the timer has fired.

We then apply this handler to each grouping that comes out:

val timeOut = FiniteDuration(5, MINUTES)

val sumSource : Source[SumResult, Unit] = 
  groupedDataSource map handleGroup(timeOut)

We now have a Source of (String,Future[Int]) which is the session_uid and a Future of the sum of traffic for that id.

Like I said, convoluted but meets the requirements. Also, I'm not entirely sure what happens if a uid that was already grouped and has been timed out, but then a new value with the same uid comes.

这篇关于如何对无限流中的传入事件进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆