结合最新从观察到的观测值 [英] Combining latest from an observable of observables
问题描述
假设我有一组的URI,我监测的可用性。每个URI是要么向上或向下,和新的URI并监控可以加入到该系统中的任何时间:
公开枚举ConnectionStatus
{
向上,
向下
}
公共类WebsiteStatus
{
公共字符串乌里
{
得到;
组;
}
公共ConnectionStatus状态
{
得到;
组;
}
}
公共类节目
{
静态无效的主要(字串[] args)
{
VAR statusStream =新的受试对象; WebsiteStatus>();
测试(statusStream);
Console.WriteLine(完成);
Console.ReadKey();
}
私有静态无效测试(的IObservable< WebsiteStatus> statusStream)
{
}
}
现在在假设测试()
我要被动地确定:
- 在所有的URI是否关闭(作为
布尔
) - 在其中的URI下降(如
的IEnumerable<字符串>
)
所以测试
将最终创造出一个可观察到像的IObservable<元组LT;布尔,IEnumerable的<字符串>>>
其中布尔
表示所有的URI是否下降和的IEnumerable<字符串>
包含这些URI是
我怎么去呢?我最初的想法是,我需要组由URI,然后结合各组的最新成一个列表,然后我可以执行选择
反对。但是,这并没有因这样 CombineLatest
和锻炼。
修改:多亏了马修的答案,我看着RXX,发现它正是时尚我本来期望在实施 CombineLatest
超载RX开箱,但我需要,使其出版即使是只有被组合(默认情况下它在等待至少两源流)单一源流去改变它。另外,我不能证明拉动二进制文件是为了一种方法的一个额外的2MB,所以我必须复制/粘贴到我的项目。这样做,我是能够解决如下:
私有静态无效测试(的IObservable< WebsiteStatus> statusStream)
{
statusStream
.GroupBy(X => x.Uri)
.CombineLatest()
。选择(
X =>
{
VAR下来= x.Where(Y => y.Status == ConnectionStatus.Down);
变种向下计数= down.Count();
VAR downUris = down.Select(Y => y.Uri).ToList();
返回新
{
AllDown = x.Count ==向下计数,
DownUris = downUris
};
})
.Subscribe(X =>
{
Console.WriteLine(来源下来({0}):{1},x.AllDown这是所有的这些:有的还在上升,x.DownUris.Aggregate(,(Y,Z) => Y + =(Z +|)));
});
}
最巧妙的方法是使用RXX扩展在这个答案。另一种方法是下面,它只是不断网站是向下/向上的列表。
VAR下游= statusStream
.Aggregate< WebsiteStatus,IEnumerable的<字符串>>(新的字符串[0],(向下,newStatus)=>
{
如果(newStatus.IsUp)
返回down.Where(URI =>!URI = newStatus.Uri);
否则,如果(!down.Contains(newStatus.Uri))
返回down.Concat(新的String [] {newStatus.Uri});
其他
回到下降;
});
VAR上游= statusStream
.Aggregate< WebsiteStatus,IEnumerable的<字符串>>(新的字符串[0],(上,newStatus)=>
{
如果(!newStatus.IsUp)
返回up.Where(URI =>!URI = newStatus.Uri);
否则,如果(!up.Contains(newStatus.Uri))
返回down.Concat(新的String [] {newStatus.Uri});
其他
返回最多;
});
变种allDown = upStream.Select(高达=>!up.Any());
Suppose I have a set of URIs that I am monitoring for availability. Each URI is either "up" or "down", and new URIs to monitor may be added to the system at any time:
public enum ConnectionStatus
{
Up,
Down
}
public class WebsiteStatus
{
public string Uri
{
get;
set;
}
public ConnectionStatus Status
{
get;
set;
}
}
public class Program
{
static void Main(string[] args)
{
var statusStream = new Subject<WebsiteStatus>();
Test(statusStream);
Console.WriteLine("Done");
Console.ReadKey();
}
private static void Test(IObservable<WebsiteStatus> statusStream)
{
}
}
Now suppose in Test()
I want to reactively ascertain:
- whether all URIs are down (as a
bool
) - which URIs are down (as
IEnumerable<string>
)
So Test
would end up creating an observable like IObservable<Tuple<bool, IEnumerable<string>>>
where the bool
indicates whether all URIs are down and the IEnumerable<string>
contains those URIs that are.
How do I go about this? My initial thinking is that I would need to group by the URI, then combine the latest from each group into a list that I could then perform a Select
against. However, this did not work out due to the way CombineLatest
works.
EDIT: Thanks to Matthew's answer I looked into rxx and found that it implemented a CombineLatest
overload in exactly the fashion I would have expected in rx out of the box, except that I needed to change it so that it publishes even when there is only a single source stream being combined (by default it was waiting for a minimum of two source streams). Also, I can't justify pulling in an extra 2MB of binaries for the sake of one method, so I have copy/pasted it into my project. Doing so, I was able to solve as follows:
private static void Test(IObservable<WebsiteStatus> statusStream)
{
statusStream
.GroupBy(x => x.Uri)
.CombineLatest()
.Select(
x =>
{
var down = x.Where(y => y.Status == ConnectionStatus.Down);
var downCount = down.Count();
var downUris = down.Select(y => y.Uri).ToList();
return new
{
AllDown = x.Count == downCount,
DownUris = downUris
};
})
.Subscribe(x =>
{
Console.WriteLine(" Sources down ({0}): {1}", x.AllDown ? "that's all of them" : "some are still up", x.DownUris.Aggregate("", (y, z) => y += (z + " | ")));
});
}
The neatest way is to use the Rxx extension in this answer. An alternative is below, it just keeps a list of sites that are down/up.
var downStream = statusStream
.Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (down, newStatus) =>
{
if (newStatus.IsUp)
return down.Where(uri => uri != newStatus.Uri);
else if (!down.Contains(newStatus.Uri))
return down.Concat(new string[] { newStatus.Uri });
else
return down;
});
var upStream = statusStream
.Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (up, newStatus) =>
{
if (!newStatus.IsUp)
return up.Where(uri => uri != newStatus.Uri);
else if (!up.Contains(newStatus.Uri))
return down.Concat(new string[] { newStatus.Uri });
else
return up;
});
var allDown = upStream.Select(up => !up.Any());
这篇关于结合最新从观察到的观测值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!