Akka流滑动窗口通过SourceQueue控制减少发射到下沉 [英] Akka stream sliding window to control reduce emit to sink by SourceQueue

查看:233
本文介绍了Akka流滑动窗口通过SourceQueue控制减少发射到下沉的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

更新:我将问题放在测试中项目详细解释我的意思

Update : I put my question in test project to explain what I mean in detail

======================= ==========================================

=====================================================================

我有Akka源代码,可以从数据库表中连续读取数据,并按某个键进行分组,然后减少它。但是,在我应用reduce函数后,数据似乎永远不会发送到接收器,因为上游总是有数据传入,所以它将连续减少。

I have Akka source that contiune read from database table, and groupby some key then reduce it. However it seems after I apply reduce function, the data never send to sink, it will contiune reduce since upstream always have data coming.

我阅读了一些文章,并尝试了groupedWithin和slide,但是它并没有按照我的想法工作,它只将消息分组为较大的部分,但从未使上游暂停和发射到下沉。以下是Akka流2.5.2中的代码

I read some post, and tried groupedWithin and sliding, but it does not work as I thought, it only group the message to larger part but never make the upstream pause and emit to sink. Following is the code in Akka stream 2.5.2

源代码减少代码:

source = source
  .groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
  .sliding(3, 1)
  .mapConcat(i -> i)
  .mapConcat(i -> i)
  .groupBy(2000000, i -> i.getEntityName())
  .map(i -> new Pair<>(i.getEntityName(), i))
  .reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
  .map(i -> i.second())
  .mergeSubstreams();

下沉并运行:

Sink<Object, CompletionStage<Done>> sink = 
        Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);

我也尝试过.takeWhile(predicated);我使用计时器来切换谓词值true和false,但似乎只需要先将其切换为false,当我切换回true时就不会重新启动上游。

I have also tried .takeWhile(predicated); I use timer to switch predicated value true and false, but it seems it will only take the first switch to false, when I switch back to true it is not restart upstream.

>请提前帮助我!

========================== ======================

=================================================

更新


有关元素类型的信息

information about the type of elements

添加我的内容想要:
我有类调用 SystemCodeTracking 包含2个属性(id,EntityName)

Add what I want: I have class call SystemCodeTracking contains 2 attributes (id, entityName)

我将拥有对象列表:(1, table1),(2, table2),(3, table3),(4, table1),(5, table3)

我想对groupName进行分组,然后对id求和,因此,结果I想要看到的是

I would like to groupBy entityName then sum the id , therefore, the result I would like to see is following

("table1" 1+4),("table3", 3+5),("table2", 2)

我现在正在执行的代码是

The code I am doing now is following

source
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)

我现在的问题是更多关于如何建立初始状态
的扫描方式?

my question right now is more on how to build scan inital state should I do ?

scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))


推荐答案

您想要的,如果我对所有事情都了解得很好:

So what you want, if I understand everything well is:


  • 首先,按id分组

  • 然后分组按时间窗口,并在此时间窗口内,将所有 systemCodeTracking.getId()

  • first, group by id
  • then group by time window, and inside this time window, sum all the systemCodeTracking.getId()

首先,您需要 groupBy 。对于第二部分 groupedWithin 。但是,它们的作用不同:第一个将为您提供子流,而第二个将为您提供列表流。

For the first part, you'll need groupBy. For the second part groupedWithin. However, they do not work the same: the first one will give you subflows, while the second one will give you a flow of lists.

因此,我们必须

首先,让我们为您的列表编写一个简化器:

First, let's write a reducer for your lists:

private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
    if (list.isEmpty()) {
        throw new Exception();
    } else {
        SystemCodeTracking building = list.get(0);
        building.setId(0L);
        list.forEach(next -> building.setId(building.getId() + next.getId()));
        return building;
    }
}

因此,对于列表中的每个元素,我们增加 building.id 获取遍历整个列表后所需的值。

So for each element in the list, we increment the building.id to get the value we want when the whole list has been traversed.

现在,您只需要

Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
    .groupBy(20000, SystemCodeTracking::getEntityName) // group by name
    .groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS)   // for a given name, group by time window (or by packs of 100)
    .filterNot(List::isEmpty)                          // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
    .map(this::reduceList)                             // reduce each list to sum the ids
    .log("====== doing reduceing ")                    // log each passing element using akka logger, rather than `System.out.println`
    .mergeSubstreams()                                 // merge back all elements with different names

这篇关于Akka流滑动窗口通过SourceQueue控制减少发射到下沉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆