结合最新从观察到的观测值 [英] Combining latest from an observable of observables

查看:126
本文介绍了结合最新从观察到的观测值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一组的URI,我监测的可用性。每个URI是要么向上或向下,和新的URI并监控可以加入到该系统中的任何时间:

 公开枚举ConnectionStatus
{
    向上,
    向下
}

公共类WebsiteStatus
{
    公共字符串乌里
    {
        得到;
        组;
    }

    公共ConnectionStatus状态
    {
        得到;
        组;
    }
}

公共类节目
{
    静态无效的主要(字串[] args)
    {
        VAR statusStream =新的受试对象; WebsiteStatus>();
        测试(statusStream);

        Console.WriteLine(完成);
        Console.ReadKey();
    }

    私有静态无效测试(的IObservable< WebsiteStatus> statusStream)
    {
    }
}
 

现在在假设测试()我要被动地确定:

  • 在所有的URI是否关闭(作为布尔
  • 在其中的URI下降(如的IEnumerable<字符串>

所以测试将最终创造出一个可观察到像的IObservable<元组LT;布尔,IEnumerable的<字符串>>> 其中布尔表示所有的URI是否下降和的IEnumerable<字符串> 包含这些URI是

我怎么去呢?我最初的想法是,我需要组由URI,然后结合各组的最新成一个列表,然后我可以执行选择反对。但是,这并没有因这样 CombineLatest 和锻炼。

修改:多亏了马修的答案,我看着RXX,发现它正是时尚我本来期望在实施 CombineLatest 超载RX开箱,但我需要,使其出版即使是只有被组合(默认情况下它在等待至少两源流)单一源流去改变它。另外,我不能证明拉动二进制文件是为了一种方法的一个额外的2MB,所以我必须复制/粘贴到我的项目。这样做,我是能够解决如下:

 私有静态无效测试(的IObservable< WebsiteStatus> statusStream)
{
    statusStream
        .GroupBy(X => x.Uri)
        .CombineLatest()
        。选择(
            X =>
            {
                VAR下来= x.Where(Y => y.Status == ConnectionStatus.Down);
                变种向下计数= down.Count();
                VAR downUris = down.Select(Y => y.Uri).ToList();

                返回新
                {
                    AllDown = x.Count ==向下计数,
                    DownUris = downUris
                };
            })
        .Subscribe(X =>
        {
            Console.WriteLine(来源下来({0}):{1},x.AllDown这是所有的这些:有的还在上升,x.DownUris.Aggregate(,(Y,Z) => Y + =(Z +|)));
        });
}
 

解决方案

最巧妙的方法是使用RXX扩展在这个答案。另一种方法是下面,它只是不断网站是向下/向上的列表。

  VAR下游= statusStream
    .Aggregate< WebsiteStatus,IEnumerable的<字符串>>(新的字符串[0],(向下,newStatus)=>
    {
        如果(newStatus.IsUp)
            返回down.Where(URI =>!URI = newStatus.Uri);
        否则,如果(!down.Contains(newStatus.Uri))
            返回down.Concat(新的String [] {newStatus.Uri});
        其他
            回到下降;
    });

VAR上游= statusStream
    .Aggregate< WebsiteStatus,IEnumerable的<字符串>>(新的字符串[0],(上,newStatus)=>
    {
        如果(!newStatus.IsUp)
            返回up.Where(URI =>!URI = newStatus.Uri);
        否则,如果(!up.Contains(newStatus.Uri))
            返回down.Concat(新的String [] {newStatus.Uri});
        其他
            返回最多;
    });

变种allDown = upStream.Select(高达=>!up.Any());
 

Suppose I have a set of URIs that I am monitoring for availability. Each URI is either "up" or "down", and new URIs to monitor may be added to the system at any time:

public enum ConnectionStatus
{
    Up,
    Down
}

public class WebsiteStatus
{
    public string Uri
    {
        get;
        set;
    }

    public ConnectionStatus Status
    {
        get;
        set;
    }
}

public class Program
{
    static void Main(string[] args)
    {
        var statusStream = new Subject<WebsiteStatus>();
        Test(statusStream);

        Console.WriteLine("Done");
        Console.ReadKey();
    }

    private static void Test(IObservable<WebsiteStatus> statusStream)
    {
    }
}

Now suppose in Test() I want to reactively ascertain:

  • whether all URIs are down (as a bool)
  • which URIs are down (as IEnumerable<string>)

So Test would end up creating an observable like IObservable<Tuple<bool, IEnumerable<string>>> where the bool indicates whether all URIs are down and the IEnumerable<string> contains those URIs that are.

How do I go about this? My initial thinking is that I would need to group by the URI, then combine the latest from each group into a list that I could then perform a Select against. However, this did not work out due to the way CombineLatest works.

EDIT: Thanks to Matthew's answer I looked into rxx and found that it implemented a CombineLatest overload in exactly the fashion I would have expected in rx out of the box, except that I needed to change it so that it publishes even when there is only a single source stream being combined (by default it was waiting for a minimum of two source streams). Also, I can't justify pulling in an extra 2MB of binaries for the sake of one method, so I have copy/pasted it into my project. Doing so, I was able to solve as follows:

private static void Test(IObservable<WebsiteStatus> statusStream)
{
    statusStream
        .GroupBy(x => x.Uri)
        .CombineLatest()
        .Select(
            x =>
            {
                var down = x.Where(y => y.Status == ConnectionStatus.Down);
                var downCount = down.Count();
                var downUris = down.Select(y => y.Uri).ToList();

                return new
                {
                    AllDown = x.Count == downCount,
                    DownUris = downUris
                };
            })
        .Subscribe(x =>
        {
            Console.WriteLine("    Sources down ({0}): {1}", x.AllDown ? "that's all of them" : "some are still up", x.DownUris.Aggregate("", (y, z) => y += (z + " | ")));
        });
}

解决方案

The neatest way is to use the Rxx extension in this answer. An alternative is below, it just keeps a list of sites that are down/up.

var downStream = statusStream
    .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (down, newStatus) =>
    {
        if (newStatus.IsUp)
            return down.Where(uri => uri != newStatus.Uri);
        else if (!down.Contains(newStatus.Uri))
            return down.Concat(new string[] { newStatus.Uri });
        else
            return down;
    });

var upStream = statusStream
    .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (up, newStatus) =>
    {
        if (!newStatus.IsUp)
            return up.Where(uri => uri != newStatus.Uri);
        else if (!up.Contains(newStatus.Uri))
            return down.Concat(new string[] { newStatus.Uri });
        else
            return up;
    });

var allDown = upStream.Select(up => !up.Any());

这篇关于结合最新从观察到的观测值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆