如何使用C#任务并行库和IProducerConsumerCollection实现通用回调? [英] How to implement generic callbacks using the C# Task Parallel Library and IProducerConsumerCollection?

查看:472
本文介绍了如何使用C#任务并行库和IProducerConsumerCollection实现通用回调?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个组件,可以向基于Web的API提交请求,但是必须限制这些请求,以免违反API的数据限制.这意味着所有请求都必须通过队列以控制其提交速率,但是它们可以(并且应该)并发执行以实现最大吞吐量.每个请求都必须在将来的某个时候将某些数据返回到调用代码.

I have a component that submits requests to a web-based API, but these requests must be throttled so as not to contravene the API's data limits. This means that all requests must pass through a queue to control the rate at which they are submitted, but they can (and should) execute concurrently to achieve maximum throughput. Each request must return some data to the calling code at some point in the future when it completes.

我正在努力创建一个不错的模型来处理数据返回.

I'm struggling to create a nice model to handle the return of data.

使用BlockingCollection我不能仅仅从Schedule方法返回Task<TResult>,因为入队和出队过程在缓冲区的两端.因此,我创建了一个RequestItem<TResult>类型,其中包含形式为Action<Task<TResult>>的回调.

Using a BlockingCollection I can't just return a Task<TResult> from the Schedule method, because the enqueuing and dequeuing processes are at either ends of the buffer. So instead I create a RequestItem<TResult> type that contains a callback of the form Action<Task<TResult>>.

这个想法是,一旦一个项目从队列中被拉出,就可以通过启动的任务来调用回调,但是到那时我已经失去了泛型类型参数,而我只剩下了反射和各种各样的麻烦. (如果可能的话).

The idea is that once an item has been pulled from the queue the callback can be invoked with the started task, but I've lost the generic type parameters by that point and I'm left using reflection and all kinds of nastiness (if it's even possible).

例如:

public class RequestScheduler 
{
    private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();

    public RequestScheduler()
    {
        this.Start();
    }

    // This can't return Task<TResult>, so returns void.
    // Instead RequestItem is generic but this poses problems when adding to the queue
    public void Schedule<TResult>(RequestItem<TResult> request)
    {
        _queue.Add(request);
    }

    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                // I want to be able to use the original type parameters here
                // is there a nice way without reflection?
                // ProcessItem submits an HttpWebRequest
                Task.Factory.StartNew(() => ProcessItem(item))
                   .ContinueWith(t => { item.Callback(t); });
            }
        });
    }

    public void Stop()
    {
        _queue.CompleteAdding();
    }
}

public class RequestItem<TResult> : IRequestItem
{
    public IOperation<TResult> Operation { get; set; }
    public Action<Task<TResult>> Callback { get; set; }
}

当请求从缓冲区中拉出并提交给API时,如何继续缓冲我的请求,但向客户端返回Task<TResult>?

How can I continue to buffer my requests but return a Task<TResult> to the client when the request is pulled from the buffer and submitted to the API?

推荐答案

首先,您可以Schedule()返回Task<TResult>,您只需要使用

First, you can return Task<TResult> from Schedule(), you just need to use TaskCompletionSource for that.

第二,要解决通用性问题,可以将其全部隐藏在(非通用)Action中.在Schedule()中,使用完全满足您需要的lambda创建一个动作.然后,消费循环将执行该操作,而无需知道其中的内容.

Second, to get around the genericity issue, you can hide all of it inside (non-generic) Actions. In Schedule(), create an action using a lambda that does exactly what you need. The consuming loop will then execute that action, it doesn't need to know what's inside.

第三,我不明白为什么要在循环的每次迭代中开始一个新的Task.首先,这意味着您实际上不会受到任何限制.

Third, I don't understand why are you starting a new Task in each iteration of the loop. For one, it means you won't actually get any throttling.

经过这些修改,代码可能如下所示:

With these modifications, the code could look like this:

public class RequestScheduler
{
    private readonly BlockingCollection<Action> m_queue = new BlockingCollection<Action>();

    public RequestScheduler()
    {
        this.Start();
    }

    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var action in m_queue.GetConsumingEnumerable())
            {
                action();
            }
        }, TaskCreationOptions.LongRunning);
    }

    public Task<TResult> Schedule<TResult>(IOperation<TResult> operation)
    {
        var tcs = new TaskCompletionSource<TResult>();

        Action action = () =>
        {
            try
            {
                tcs.SetResult(ProcessItem(operation));
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
        };

        m_queue.Add(action);

        return tcs.Task;
    }

    private T ProcessItem<T>(IOperation<T> operation)
    {
        // whatever
    }
}

这篇关于如何使用C#任务并行库和IProducerConsumerCollection实现通用回调?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆