Rx 运算符到不同的序列 [英] Rx operator to distinct sequences

查看:28
本文介绍了Rx 运算符到不同的序列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

重要:有关结果的描述和更多详细信息,请查看我的回答

IMPORTANT: for a description of the results and some more details, please have a look also to my answer

我需要对通常被复制的一系列对象/事件进行分组和过滤,并以 TimeSpan 间隔缓冲它们.我试着用大理石图更好地解释它:

I need to group and filter a sequence of objects/events that usually are replicated, buffering them with a TimeSpan interval. I try to explain it better with sort of marble diagrams:

X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z

会产生

X---Y---Z---X---Y---Z

其中 X、Y 和 Z 是不同的事件类型,'---' 表示间隔.此外,我还想通过一个关键属性来区分它适用于所有类型,因为它们有一个共同的基类:

where X, Y and Z are different event types, and '---' means the interval. Additionally, I would also like to distinct by a key property that it is available on all types because they have a common base class:

X, Y, Z : A

并且 A 包含一个属性 Key.使用符号 X.a 表示 X.Key = a,最终样本将是:

and A contains a property Key. Using the notation X.a meaning X.Key = a, A final sample would be:

X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c

会产生

X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b

有人可以帮我组合所需的 Linq 运算符(可能是 DistinctUntilChanged 和 Buffer)来实现这种行为吗?谢谢

Can anybody help me putting together the required Linq operators (probably DistinctUntilChanged and Buffer) to achieve this behavior? Thanks

更新 18.08.12:

根据要求,我尝试给出更好的解释.我们有设备收集事件并将事件发送到网络服务.这些设备有一个旧的逻辑(由于向后兼容,我们不能改变它)并且它们不断地发送一个事件,直到它们收到一个确认;在确认之后,他们发送队列中的下一个事件,依此类推.事件包含单元的网络地址和其他一些区分每个设备队列中事件的属性.事件如下所示:

as requested, I try to give a better explanation. We have devices collecting and sending events to a web service. These devices have an old logic (and we can't change it due to backward compatibility) and they continuously send an event until they receive an acknowledge; after the acknowledge, they send the next event in their queue, and so on. Events contain the network address of the unit and some other properties distinguishing events in the queue for each device. An event looks like this:

class Event
{
    public string NetworkAddress { get; }

    public string EventCode { get; }

    public string AdditionalAttribute { get; }
}

目标是每 5 秒处理一次从所有设备接收到的区分事件,将信息存储在数据库中(这就是我们不想批量执行的原因)并将 ack 发送到设备.让我们举一个只有两个设备和一些事件的例子:

The goal is that of processing every 5 seconds the distinguished events received from all devices, storing information in the database (that's why we don't want to do it in batches) and sending the ack to the device. Let's make an example with only two devices and some events:

Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'

Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'

Pn are the operations done by our server, explained later

可能的弹珠图(输入流+输出流):

Possible marble diagram (input streams + output stream):

Device 'a'          : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b'          : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...

Time                : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-

P1: Server stores and acknowledges [a1] and [b1]
P2: "      "      "   "            [b2]
P3: "      "      "   "            [a2] and [b3]
P4: "      "      "   "            [a3] and [b4]

最后我认为这可能是基本运算符的简单组合,但我是 Rx 的新手,我有点困惑,因为似乎有很多运算符(或运算符的组合)可以得到相同的输出流.

At the end I think it is probably a simple combination of basic operators, but I'm new to Rx and I'm a bit confused since it seems that there are lots of operators (or combinations of operators) to get the same output stream.

12 年 8 月 19 日更新:

请记住,此代码在服务器上运行,它应该可以运行数天而不会出现内存泄漏...我不确定主题的行为.目前,对于每个事件,我都会调用一个服务上的推送操作,该操作调用我应该在其上构建查询的主题的 OnNext(如果我对主题的使用没有错).

Please keep in mind that this code runs on a server and it should run for days without memory leaks...I'm not sure about the behavior of subjects. At the moment, for each event I call a push operation on a service, which calls the OnNext of a Subject on top of which I should build the query (if I'm not wrong about the usage of subjects).

20.08.12 更新:

当前实现,包括验证测试;这是我尝试过的,似乎与@yamen 的建议相同

Current implementation, including validation test; this is what I tried and it seems the same suggested by @yamen

public interface IEventService
{
    // Persists the events
    void Add(IEnumerable<Event> events);
}

public class Event
{
    public string Description { get; set; }
}

/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
    private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);

    private readonly Subject<EventMessage> subject = new Subject<EventMessage>();

    private readonly IDisposable subscription;

    private readonly object locker = new object();

    private readonly IEventService eventService;

    /// <summary>
    /// Initializes a new instance of the <see cref="EventManager"/> class.
    /// </summary>
    /// <param name="scheduler">The scheduler.</param>
    public EventManager(IEventService eventService, IScheduler scheduler)
    {
        this.eventService = eventService;
        this.subscription = this.CreateQuery(scheduler);
    }

    /// <summary>
    /// Pushes the event.
    /// </summary>
    /// <param name="eventMessage">The event message.</param>
    public void PushEvent(EventMessage eventMessage)
    {
        Contract.Requires(eventMessage != null);
        this.subject.OnNext(eventMessage);
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    /// <filterpriority>2</filterpriority>
    public void Dispose()
    {
        this.Dispose(true);
    }

    private void Dispose(bool disposing)
    {
        if (disposing)
        {
            // Dispose unmanaged resources
        }

        this.subject.Dispose();
        this.subscription.Dispose();
    }

    private IDisposable CreateQuery(IScheduler scheduler)
    {
        var buffered = this.subject
            .DistinctUntilChanged(new EventComparer())
            .Buffer(EventHandlingPeriod, scheduler);

        var query = buffered
            .Subscribe(this.HandleEvents);
        return query;
    }

    private void HandleEvents(IList<EventMessage> eventMessages)
    {
        Contract.Requires(eventMessages != null);
        var events = eventMessages.Select(this.SelectEvent);
        this.eventService.Add(events);
    }

    private Event SelectEvent(EventMessage message)
    {
        return new Event { Description = "evaluated description" };
    }

    private class EventComparer : IEqualityComparer<EventMessage>
    {
        public bool Equals(EventMessage x, EventMessage y)
        {
            return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
        }

        public int GetHashCode(EventMessage obj)
        {
            var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
            return s.GetHashCode();
        }
    }
}

public class EventMessage
{
    public string NetworkAddress { get; set; }

    public byte EventCode { get; set; }

    public byte Attribute { get; set; }

    // Other properties
}

和测试:

public void PushEventTest()
    {
        const string Address1 = "A:2.1.1";
        const string Address2 = "A:2.1.2";

        var eventServiceMock = new Mock<IEventService>();

        var scheduler = new TestScheduler();
        var target = new EventManager(eventServiceMock.Object, scheduler);
        var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
        var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        scheduler.Schedule(() => target.PushEvent(eventMessageA1));
        scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1));
        scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1));

        scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);

        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());

        scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1));

        scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);

        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
    }

此外,我再次指出,软件可以运行数天而没有问题,处理数以千计的消息非常重要.明确地说:测试没有通过当前的实现.

Additionally, I remark again that it is really important that the software could run for days without problems, handling thousands of messages. To make it clear: the test doesn't pass with the current implementation.

推荐答案

我不确定这是否完全符合您的要求,但您可以使用 group 关键字,然后在重新组合它们之前分别操作各种 IObservable.

I'm not sure if this does exactly what you'd like, but you may be to group the elements explicitly using the group keyword, and then to manipulate the various IObservables separately before recombining them.

例如如果我们有类定义,例如

E.g. if we have class definitions such as

class A
{
    public char Key { get; set; }
}

class X : A { }
...

和一个 Subject

Subject<A> subject = new Subject<A>();

然后我们可以写

var buffered =
    from a in subject
    group a by new { Type = a.GetType(), Key = a.Key } into g
    from buffer in g.Buffer(TimeSpan.FromMilliseconds(300))
    where buffer.Any()
    select new
    {
        Count = buffer.Count,
        Type = buffer.First().GetType().Name,
        Key = buffer.First().Key
    };

buffered.Do(Console.WriteLine).Subscribe();

我们可以使用您提供的数据对此进行测试:

We can test this with the data you provided:

subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100);
subject.OnNext(new X { Key = 'b' }); 
Thread.Sleep(100);
subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100);
...
subject.OnCompleted();

要获得您提供的输出:

{ Count = 2, Type = X, Key = a }
{ Count = 1, Type = X, Key = b }
{ Count = 1, Type = Y, Key = b }
{ Count = 1, Type = Y, Key = c }
{ Count = 2, Type = Z, Key = a }
{ Count = 2, Type = Z, Key = c }
{ Count = 1, Type = Z, Key = b }

这篇关于Rx 运算符到不同的序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆