如何使用SubFlows对已排序流的项目进行分组? [英] How do I group items of sorted stream with SubFlows?

查看:62
本文介绍了如何使用SubFlows对已排序流的项目进行分组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

你们能解释一下如何在akka流中使用新的 groupBy 吗? 文档似乎毫无用处。 groupBy 用于返回(T,源),但现在不再返回。这是我的示例(我模仿了一个文档):

Could you guys explain how to use new groupBy in akka-streams ? Documentation seems to be quite useless. groupBy used to return (T, Source) but not anymore. Here is my example (I mimicked one from docs):

Source(List(
  1 -> "1a", 1 -> "1b", 1 -> "1c",
  2 -> "2a", 2 -> "2b",
  3 -> "3a", 3 -> "3b", 3 -> "3c",
  4 -> "4a",
  5 -> "5a", 5 -> "5b", 5 -> "5c",
  6 -> "6a", 6 -> "6b",
  7 -> "7a",
  8 -> "8a", 8 -> "8b",
  9 -> "9a", 9 -> "9b",
))
  .groupBy(3, _._1)
  .map { case (aid, raw) =>
    aid -> List(raw)
  }
  .reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
  (l._1, l._2 ::: r._2)
}
  .mergeSubstreams
  .runForeach { case (aid: Int, items: List[String]) =>
    println(s"$aid - ${items.length}")
  }

这只是挂起。可能是因为子流的数量少于唯一键的数量而挂起。但是,如果流无限,该怎么办?我想分组直到关键更改。

This simply hangs. Perhaps it hangs because number of substreams is lower than number of unique keys. But what should I do if I have infinite stream ? I'd like to group until key changes.

在我的实时流中,数据总是按我分组的值排序。也许我根本不需要 groupBy

In my real stream data is always sorted by value I'm grouping by. Perhaps I don't need groupBy at all ?

推荐答案

我结束了实施自定义阶段

I ended up implementing custom stage

class GroupAfterKeyChangeStage[K, T](keyForItem: T ⇒ K, maxBufferSize: Int) extends GraphStage[FlowShape[T, List[T]]] {

  private val in = Inlet[T]("GroupAfterKeyChangeStage.in")
  private val out = Outlet[List[T]]("GroupAfterKeyChangeStage.out")

  override val shape: FlowShape[T, List[T]] =
    FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {

    private val buffer = new ListBuffer[T]
    private var currentKey: Option[K] = None

    // InHandler
    override def onPush(): Unit = {
      val nextItem = grab(in)
      val nextItemKey = keyForItem(nextItem)

      if (currentKey.forall(_ == nextItemKey)) {
        if (currentKey.isEmpty)
          currentKey = Some(nextItemKey)

        if (buffer.size == maxBufferSize)
          failStage(new RuntimeException(s"Maximum buffer size is exceeded on key $nextItemKey"))
        else {
          buffer += nextItem
          pull(in)
        }
      } else {
        val result = buffer.result()
        buffer.clear()
        buffer += nextItem
        currentKey = Some(nextItemKey)
        push(out, result)
      }
    }

    // OutHandler
    override def onPull(): Unit = {
      if (isClosed(in))
        failStage(new RuntimeException("Upstream finished but there was a truncated final frame in the buffer"))
      else
        pull(in)
    }

    // InHandler
    override def onUpstreamFinish(): Unit = {
      val result = buffer.result()
      if (result.nonEmpty) {
        emit(out, result)
        completeStage()
      } else
        completeStage()

      // else swallow the termination and wait for pull
    }

    override def postStop(): Unit = {
      buffer.clear()
    }

    setHandlers(in, out, this)
  }
}






如果您不想复制粘贴,我已经将其添加到我维护的帮助器库中。为了使用它,您需要添加


If you don't want to copy-paste it I've added it to helper library that I maintain. In order to use you need to add

Resolver.bintrayRepo("cppexpert", "maven")

给您的解析器。添加添加foolowing到您的依赖项

to your resolvers. Add add foolowingto your dependencies

"com.walkmind" %% "scala-tricks" % "2.15"

它在 com.walkmind.akkastream.FlowExt 中作为流

def groupSortedByKey[K, T](keyForItem: T ⇒ K, maxBufferSize: Int): Flow[T, List[T], NotUsed]

在我的示例中,将是

source
  .via(FlowExt.groupSortedByKey(_._1, 128))

这篇关于如何使用SubFlows对已排序流的项目进行分组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆