使用 Rx 轮询网站 [英] Polling a website using Rx

查看:25
本文介绍了使用 Rx 轮询网站的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

只是想了解一下 Rx

我每 2 秒使用 Rx 轮询一次网站

var results = new List();var cx = new WebserviceAPI( ... );var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); });var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe(_ => { cx.StartGetDataAsync(); });

(Web 服务 API 公开了一个 getItemsAsync/getItemsCompleted 事件处理程序类型机制,我正在从中创建一个 observable).

当网站返回时,我将响应的业务部分"解包成一个 IEnumerable 的 DTO

public IObservable>获取数据异步(){var o = Observable.FromEventPattern(h =>_webService.getItemsCompleted += h,h =>_webService.getItemsCompleted -= h);return o.Select(c=> from itm in c.EventArgs.Result.ItemList选择新的 MyDTO(){...});}

我的推理是,鉴于所有数据都在字符串中,将它打包然后放入 IEnumerable 是有意义的......但现在我不确定这是否正确!

如果网站的响应时间超过 2 秒,我发现 MSTest 崩溃了.调试时,产生的错误是

<块引用>

"异步处理时出错.唯一状态多个异步同时操作需要对象出类拔萃"

内部异常

<块引用>

"项目已添加.字典中的键:'System.Object'键正在添加:'System.Object'"

我假设问题是重入问题,因为下一个调用在前一个调用完成数据填充之前开始并返回数据.

所以我不确定是否

  1. 我把这件事放在一起很正确
  2. 我应该以某种方式限制连接以避免重入.
  3. 我应该使用不同的中间数据结构(或机制)而不是 IEnumerable

希望得到一些指导.

编辑 1:所以我改变了网络调用以包含一个唯一的状态对象

public void StartGetDataAsync(){...//是:_webService.getItemsAsync(request);_webService.getItemsAsync(request, Guid.NewGuid());}

并让它发挥作用.但我仍然不确定这是否是正确的方法

编辑 2 - Web 服务信号:我正在使用 webServiceApi 类包装的肥皂网络服务.创建的references.cs包含以下方法

public void getItemsAsync(GetItemsReq request, object userState){if ((this.getItemsOperationCompleted == null)){this.getItemsOperationCompleted = new System.Threading.SendOrPostCallback(this.OngetItemsOperationCompleted);}this.InvokeAsync("getItems", new object[] {request}, this.getItemsOperationCompleted, userState);}私有 System.Threading.SendOrPostCallback getItemsOperationCompleted;公共事件 getItemsCompletedEventHandler getItemsCompleted;公共委托 void getItemsCompletedEventHandler(object sender, getItemsCompletedEventArgs e);公共部分类 getItemsCompletedEventArgs : System.ComponentModel.AsyncCompletedEventArgs{...}私有无效 OngetItemsOperationCompleted(对象 arg){如果 ((this.getItemsCompleted != null)){System.Web.Services.Protocols.InvokeCompletedEventArgs invokeArgs = ((System.Web.Services.Protocols.InvokeCompletedEventArgs)(arg));this.getItemsCompleted(this, new getItemsCompletedEventArgs(invokeArgs.Results, invokeArgs.Error, invokeArgs.Cancelled, invokeArgs.UserState));}}

可能给你太多(或错过了什么)!

谢谢

解决方案

我想我有一个不错的起点.

基本上,我认为您需要抽象出 Web 服务的复杂性并创建一个漂亮的干净函数来获得结果.

尝试这样的事情:

Func>取=rq =>Observable.Create(o =>{var cx = new WebserviceAPI(/* ... */);var state = new object();可变资源 =可观察的.FromEventPattern(h =>cx.getItemsCompleted += h,h =>cx.getItemsCompleted -= h).Where(x => x.EventArgs.UserState == state).采取(1).Select(x => x.EventArgs);var 订阅 = res.Subscribe(o);cx.getItemsAsync(rq, state);退订;});

我个人会更进一步定义一个返回类型,比如 GetItemsReq,它不包括用户状态对象,但与 getItemsCompletedEventArgs 基本相同.

然后您应该能够使用 Observable.Interval 来创建您需要的轮询.

如果您的 Web 服务实现了 IDisposable,那么您应该在上述函数中添加一个 Observable.Using 调用,以便在 Web 服务完成时正确处理它.

如果这有帮助,请告诉我.

just trying to get my head around Rx

I am using Rx to poll a website every 2 seconds

var results = new List<MyDTO>();
var cx = new WebserviceAPI( ... );
var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); });
var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe( _ => { cx.StartGetDataAsync(); });

(The webservice API exposes a getItemsAsync/getItemsCompleted event handler type mechanism from which I am creating an observable).

When the web site returns, I am unpacking the "business part of" the response into an IEnumerable of DTOs

public IObservable<IEnumerable<MyDTO>> GetDataAsync()
{
    var o = Observable.FromEventPattern<getItemsCompletedEventHandler,getItemsCompletedEventArgs>(
        h => _webService.getItemsCompleted += h,
        h => _webService.getItemsCompleted -= h);

    return o.Select(c=> from itm in c.EventArgs.Result.ItemList
                        select new MyDTO()
                        {
                           ...
                        });
}

My reasoning being that given that all the data was just there in the string, it made sense just to pack it up there an then into an IEnumerable ... but now I'm not sure if that is right!

If the website takes longer than 2 secs to respond I am finding that MSTest is crashing out. When debugging, the error being generated is

"There was an error during asynchronous processing. Unique state object is required for multiple asynchronous simultaneous operations to be outstanding"

with the inner exception

"Item has already been added. Key in dictionary: 'System.Object' Key being added: 'System.Object'"

I am supposing that the problem is one of reentrancy in that the next call is starting and returning data before the previous call has finished populating the data.

So I'm not sure whether

  1. I have put the thing together quite right
  2. I should be throttling the connection in some way so as to avoid re-entrancy.
  3. I should use a different intermediate data structure (or mechanism) instead of an IEnumerable

I would appreciate some guidance.

EDIT 1: So I have changed the web call to include a unique state object

public void StartGetDataAsync()
{
    ...
    //  was: _webService.getItemsAsync(request);
    _webService.getItemsAsync(request, Guid.NewGuid());
}

and made it work. But I am still unsure if that is the right way to do it

EDIT 2 - Web service sigs: I'm consuming a soap web service which the webServiceApi class wraps. The references.cs created contains the following methods

public void getItemsAsync(GetItemsReq request, object userState) 
{
    if ((this.getItemsOperationCompleted == null)) 
    {
        this.getItemsOperationCompleted = new System.Threading.SendOrPostCallback(this.OngetItemsOperationCompleted);
    }
    this.InvokeAsync("getItems", new object[] {
                    request}, this.getItemsOperationCompleted, userState);
}

private System.Threading.SendOrPostCallback getItemsOperationCompleted;

public event getItemsCompletedEventHandler getItemsCompleted;

public delegate void getItemsCompletedEventHandler(object sender, getItemsCompletedEventArgs e);

public partial class getItemsCompletedEventArgs : System.ComponentModel.AsyncCompletedEventArgs 
{
    ...
}

private void OngetItemsOperationCompleted(object arg) 
{
    if ((this.getItemsCompleted != null)) 
    {
        System.Web.Services.Protocols.InvokeCompletedEventArgs invokeArgs = ((System.Web.Services.Protocols.InvokeCompletedEventArgs)(arg));
        this.getItemsCompleted(this, new getItemsCompletedEventArgs(invokeArgs.Results, invokeArgs.Error, invokeArgs.Cancelled, invokeArgs.UserState));
    }
 }

Probably given you too much (or missed something)!

Thx

解决方案

I think I've got a decent starting point for you.

Basically I think you need to abstract away the complexity of the web service and create a nice clean function to get your results.

Try something like this:

Func<GetItemsReq, IObservable<getItemsCompletedEventArgs>> fetch =
    rq =>
        Observable.Create<getItemsCompletedEventArgs>(o =>
        {
            var cx = new WebserviceAPI(/* ... */);
            var state = new object();
            var res =
                Observable
                    .FromEventPattern<
                        getItemsCompletedEventHandler,
                        getItemsCompletedEventArgs>(
                        h => cx.getItemsCompleted += h,
                        h => cx.getItemsCompleted -= h)
                    .Where(x => x.EventArgs.UserState == state)
                    .Take(1)
                    .Select(x => x.EventArgs);
            var subscription = res.Subscribe(o);
            cx.getItemsAsync(rq, state);
            return subscription;
        });

Personally I would go one step further and define a return type, say GetItemsReq, that doesn't include the user state object, but is basically the same as getItemsCompletedEventArgs.

You should then be able to use Observable.Interval to create the polling that you need.

If your web service implements IDisposable then you should add an Observable.Using call into the above function to correctly dispose of the web service when it is complete.

Let me know if this helps.

这篇关于使用 Rx 轮询网站的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆