分组依据缓冲区 - Reactive Extensions [英] Group by with buffer - Reactive Extensions

查看:76
本文介绍了分组依据缓冲区 - Reactive Extensions的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

堆栈交换问题 -
http://stackoverflow.com/questions/16464721/reactive-extensions-group-by-unique-bufferwindow-till-time-span-with-cancellati

Problem from stack exchange - http://stackoverflow.com/questions/16464721/reactive-extensions-group-by-unique-bufferwindow-till-time-span-with-cancellati

我有一个热门事件流来自以下类型-Event {

I have a Hot stream of events coming of following type -Event {

string parentName,string name;国家; //其1或2即有效或无效

string parentName,string name; int state ; // its 1 or 2 ie active or unactive

}

如果在这2分钟内,我需要为每位父母缓冲2分钟的事件,对于给定的父级,我收到状态= 2的孩子的任何事件,这个缓冲区应该取消并且应该输出0否则我得到事件recvd的计数。

I need to buffer event per parent for 2 minutes, if during this 2 minute , i recv any event for child with state =2 for a given parent , this buffer should cancel and should output 0 otherwise i get the count of the events recvd .

我知道我必须使用GroupBy进行分区,然后缓冲然后计数,但我无法想到我创建Buffer的方式,每个父母都是唯一的,虽然我使用Distinct但这并没有解决问题,因为我只是不想创建
缓冲区,直到父级激活(一旦父级缓冲区被取消或2分钟结束,父级缓冲区可以再次创建)所以我理解我需要创建一个自定义缓冲区来检查创建缓冲区的条件,但是我怎么做这个
通过反应式扩展。

I know I have to use GroupBy to partition, and then buffer and then count but i am unable to think of a way by which i create Buffer which is unique per parent, i though of using Distinct but this doesnt solve the problem, for i only dont want to create buffer till the parent is active (as once the parent's buffer gets cancelled or 2 minutes is over, the parent buffer can be created again) So I understand I need to create a custom buffer which checks the condition for creating buffer, but how do i do this via reactive extensions.

提供的解决方案无效,任何其他想法

The solution provided there doesnt work, any other idea

推荐答案

假设每个组的两分钟缓冲区应该在看到该组的第一个事件后立即打开,并在两分钟后关闭或看到零状态,那么我认为这有效:

Assuming that a two minute buffer for each group should open as soon as the first event for that group is seen, and close after two minutes or a zero state is seen, then I think this works:

public static IObservable<EventCount> EventCountByParent(this IObservable<Event> source, IScheduler scheduler)
{
    return Observable.Create<EventCount>(observer => source.GroupByUntil(
        evt => GetParent(evt.Name),
        evt => evt,
        group =>
        @group.Where(evt => evt.State == 2)
                .Merge(Observable.Timer(TimeSpan.FromMinutes(2), scheduler).Select(_ => Event.Null)))
                .SelectMany(
                    go =>
                    go.Aggregate(0, (acc, evt) => (evt.State == 2 ? 0 : acc + 1))
                    .Select(count => new EventCount(go.Key, count))).Subscribe(observer));
}

将EventCount设为:

With EventCount as:

    public class EventCount
    {
        private readonly string _name;
        private readonly int _count;

        public EventCount(string name, int count)
        {
            _name = name;
            _count = count;
        }
    }

事件为:

    public class Event
    {
        public static Event Null = new Event(string.Empty, 0);

        private readonly string _name;
        private readonly int _state;

        public Event(string name, int state)
        {
            _name = name;
            _state = state;
        }

        public string Name { get { return _name;  } }
        public int State { get { return _state; } }
    }





这篇关于分组依据缓冲区 - Reactive Extensions的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆