使用无功扩展处理以正确的顺序多个响应 [英] Use reactive extension to handle multiple responses in correct order

查看:114
本文介绍了使用无功扩展处理以正确的顺序多个响应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

状况

我有一个,其中一个请求产生了两个响应系统。请求和响应都有相应的观测值:

 的IObservable< RequestSent> _要求;
的IObservable< MainResponseReceived> _mainResponses;
的IObservable< SecondResponseReceived> _secondaryResponses;
 

可以保证 RequestSent 早于 MainResponseReceived 事件发生和 SecondaryResponseReceived ,但反应过来以随机顺序。

我有什么

本来我想处理程序,说明如何处理答复,所以我压缩的观测值:

  _requests
    .SelectMany(异步请求=>
    {
        VAR主要= _mainResponses.FirstAsync(M => m.Id == request.Id);
        变种次级= _secondaryResponses.FirstAsync(S => s.Id == request.Id);

        VAR zippedResponse = main.Zip(二级,(M,S)=>新建MainAndSecondaryResponseReceived {
            请求=请求,
            主要= M,
            二级= S
        });
        返回等待zippedResponse.FirstAsync(); ;
    })
    .Subscribe(OnMainAndSecondaryResponseReceived);
 


我需要什么

现在我还需要办理 MainResponseReceived 无需等待SecondaryResponseRecieved,它必须保证,那OnMainResponseRecieved前 OnMainAndSecondaryResponseReceived

如何定义两个订阅,请?

测试案例1:

  1. RequestSent 发生
  2. MainResponseReceived 时 - > OnMainResponseReceived叫
  3. SecondaryResponseReceive D出现 - > OnMainAndSecondaryResponseReceived叫

测试案例2:

  1. RequestSent 发生
  2. SecondaryResponseReceived 发生
  3. MainResponseReceived发生 - > OnMainResponseReceived被称为 - > OnMainAndSecondaryResponseReceived叫
解决方案

我觉得你是pretty的多少在正确的轨道。我将停止摆弄周围所有的异步的东西 - 那只是使事情复杂化

试试这个查询:

  VAR的查询=
    _要求
        .SelectMany(要求=>
            _mainResponses.Where(米=> m.Id == request.Id)。取(1)
                。做(米=> OnMainResponseReceived(米))
                。拉链(
                    _secondaryResponses.Where(S => s.Id == request.Id)。取(1),
                    (M,S)=>新MainAndSecondaryResponseReceived()
                    {
                        请求=请求,
                        主要= M,
                        二级= S
                    }));

VAR订阅=
    query.Subscribe(X => OnMainAndSecondaryResponseReceived(x)的);
 

。做(...)中的重要缺失的部分在code。它确保了 OnMainResponseReceived OnMainAndSecondaryResponseReceived 被称为无论如果主要或次要响应进来第一。

我测试用:

 受试对象; RequestSent> _requestsSubject =新的受试对象; RequestSent>();
受试对象; MainResponseReceived> _mainResponsesSubject =新的受试对象; MainResponseReceived>();
受试对象; SecondResponseReceived> _secondaryResponsesSubject =新的受试对象; SecondResponseReceived>();

的IObservable< RequestSent> _requests = _requestsSubject.AsObservable();
的IObservable< MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
的IObservable< SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();

_requestsSubject.OnNext(新RequestSent(){n = 42});
_mainResponsesSubject.OnNext(新MainResponseReceived(){n = 42});
_secondaryResponsesSubject.OnNext(新SecondResponseReceived(){n = 42});

_requestsSubject.OnNext(新RequestSent(){n = 99});
_mainResponsesSubject.OnNext(新MainResponseReceived(){n = 99});
_secondaryResponsesSubject.OnNext(新SecondResponseReceived(){n = 99});
 

Situation

I have a system where one request produces two responses. The request and responses have corresponding observables:

IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;

it is guaranteed that RequestSent event occurs earlier than MainResponseReceived and SecondaryResponseReceived but the responses come in random order.

What I have

Originally I wanted handler that handles both responses, so I zipped the observables:

_requests
    .SelectMany(async request =>
    {
        var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
        var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);

        var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
            Request = request,
            Main = m, 
            Secondary = s
        });
        return await zippedResponse.FirstAsync(); ;
    })
    .Subscribe(OnMainAndSecondaryResponseReceived);


What I need

Now I need to handle also MainResponseReceived without waiting for SecondaryResponseRecieved and it must be guaranteed, that the OnMainResponseRecieved completes before OnMainAndSecondaryResponseReceived is called

How to define the two subscriptions, please?

Test case 1:

  1. RequestSent occurs
  2. MainResponseReceived occurs -> OnMainResponseReceived is called
  3. SecondaryResponseReceived occurs -> OnMainAndSecondaryResponseReceived is called

Test case 2:

  1. RequestSent occurs
  2. SecondaryResponseReceived occurs
  3. MainResponseReceived occurs -> OnMainResponseReceived is called -> OnMainAndSecondaryResponseReceived is called

解决方案

I think you're pretty much on the right track. I would stop mucking around with all the Async stuff - that's just making things complicated.

Try this query:

var query =
    _requests
        .SelectMany(request =>
            _mainResponses.Where(m => m.Id == request.Id).Take(1)
                .Do(m => OnMainResponseReceived(m))
                .Zip(
                    _secondaryResponses.Where(s => s.Id == request.Id).Take(1),
                    (m, s) => new MainAndSecondaryResponseReceived()
                    {
                        Request = request,
                        Main = m, 
                        Secondary = s
                    }));

var subscription =
    query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));

The .Do(...) is the important missing part in your code. It ensures that OnMainResponseReceived is called before OnMainAndSecondaryResponseReceived no matter if the main or secondary response comes in first.

I tested this with:

Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();

IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();

_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });

_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });

这篇关于使用无功扩展处理以正确的顺序多个响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆