使用无功扩展处理以正确的顺序多个响应 [英] Use reactive extension to handle multiple responses in correct order
问题描述
状况
我有一个,其中一个请求产生了两个响应系统。请求和响应都有相应的观测值:
的IObservable< RequestSent> _要求;
的IObservable< MainResponseReceived> _mainResponses;
的IObservable< SecondResponseReceived> _secondaryResponses;
可以保证 RequestSent
早于 MainResponseReceived
事件发生和 SecondaryResponseReceived
,但反应过来以随机顺序。
我有什么
本来我想处理程序,说明如何处理答复,所以我压缩的观测值:
_requests
.SelectMany(异步请求=>
{
VAR主要= _mainResponses.FirstAsync(M => m.Id == request.Id);
变种次级= _secondaryResponses.FirstAsync(S => s.Id == request.Id);
VAR zippedResponse = main.Zip(二级,(M,S)=>新建MainAndSecondaryResponseReceived {
请求=请求,
主要= M,
二级= S
});
返回等待zippedResponse.FirstAsync(); ;
})
.Subscribe(OnMainAndSecondaryResponseReceived);
我需要什么
现在我还需要办理 MainResponseReceived
无需等待SecondaryResponseRecieved,它必须保证,那OnMainResponseRecieved前 OnMainAndSecondaryResponseReceived $ C $完成C>叫
如何定义两个订阅,请?
测试案例1:
-
RequestSent
发生 -
MainResponseReceived
时 - > OnMainResponseReceived叫 -
SecondaryResponseReceive
D出现 - > OnMainAndSecondaryResponseReceived叫
测试案例2:
-
RequestSent
发生 -
SecondaryResponseReceived
发生 -
MainResponseReceived发生
- > OnMainResponseReceived被称为 - > OnMainAndSecondaryResponseReceived叫
我觉得你是pretty的多少在正确的轨道。我将停止摆弄周围所有的异步的东西 - 那只是使事情复杂化
试试这个查询:
VAR的查询=
_要求
.SelectMany(要求=>
_mainResponses.Where(米=> m.Id == request.Id)。取(1)
。做(米=> OnMainResponseReceived(米))
。拉链(
_secondaryResponses.Where(S => s.Id == request.Id)。取(1),
(M,S)=>新MainAndSecondaryResponseReceived()
{
请求=请求,
主要= M,
二级= S
}));
VAR订阅=
query.Subscribe(X => OnMainAndSecondaryResponseReceived(x)的);
在。做(...)
中的重要缺失的部分在code。它确保了 OnMainResponseReceived
在 OnMainAndSecondaryResponseReceived
被称为无论如果主要或次要响应进来第一。
我测试用:
受试对象; RequestSent> _requestsSubject =新的受试对象; RequestSent>();
受试对象; MainResponseReceived> _mainResponsesSubject =新的受试对象; MainResponseReceived>();
受试对象; SecondResponseReceived> _secondaryResponsesSubject =新的受试对象; SecondResponseReceived>();
的IObservable< RequestSent> _requests = _requestsSubject.AsObservable();
的IObservable< MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
的IObservable< SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();
_requestsSubject.OnNext(新RequestSent(){n = 42});
_mainResponsesSubject.OnNext(新MainResponseReceived(){n = 42});
_secondaryResponsesSubject.OnNext(新SecondResponseReceived(){n = 42});
_requestsSubject.OnNext(新RequestSent(){n = 99});
_mainResponsesSubject.OnNext(新MainResponseReceived(){n = 99});
_secondaryResponsesSubject.OnNext(新SecondResponseReceived(){n = 99});
Situation
I have a system where one request produces two responses. The request and responses have corresponding observables:
IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;
it is guaranteed that RequestSent
event occurs earlier than MainResponseReceived
and SecondaryResponseReceived
but the responses come in random order.
What I have
Originally I wanted handler that handles both responses, so I zipped the observables:
_requests
.SelectMany(async request =>
{
var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);
var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
Request = request,
Main = m,
Secondary = s
});
return await zippedResponse.FirstAsync(); ;
})
.Subscribe(OnMainAndSecondaryResponseReceived);
What I need
Now I need to handle also MainResponseReceived
without waiting for SecondaryResponseRecieved and it must be guaranteed, that the OnMainResponseRecieved completes before OnMainAndSecondaryResponseReceived
is called
How to define the two subscriptions, please?
Test case 1:
RequestSent
occursMainResponseReceived
occurs -> OnMainResponseReceived is calledSecondaryResponseReceive
d occurs -> OnMainAndSecondaryResponseReceived is called
Test case 2:
RequestSent
occursSecondaryResponseReceived
occursMainResponseReceived occurs
-> OnMainResponseReceived is called -> OnMainAndSecondaryResponseReceived is called
I think you're pretty much on the right track. I would stop mucking around with all the Async stuff - that's just making things complicated.
Try this query:
var query =
_requests
.SelectMany(request =>
_mainResponses.Where(m => m.Id == request.Id).Take(1)
.Do(m => OnMainResponseReceived(m))
.Zip(
_secondaryResponses.Where(s => s.Id == request.Id).Take(1),
(m, s) => new MainAndSecondaryResponseReceived()
{
Request = request,
Main = m,
Secondary = s
}));
var subscription =
query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));
The .Do(...)
is the important missing part in your code. It ensures that OnMainResponseReceived
is called before OnMainAndSecondaryResponseReceived
no matter if the main or secondary response comes in first.
I tested this with:
Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();
IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();
_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });
_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });
这篇关于使用无功扩展处理以正确的顺序多个响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!