使用响应式扩展的并行缓存 [英] Parallel Cache using Reactive extensions

查看:68
本文介绍了使用响应式扩展的并行缓存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一个并行缓存.要求是有 n 个数据收集器需要立即触发.这些数据收集器中的每一个都将到达边界层(称为服务层)并检索数据.但是,由于这是在同一请求(WCF)中,因此如果两个数据收集器需要在服务层上调用相同的方法,则我不希望第二个请求等待第一个请求完成.

I'm looking to build a parallel cache. The requirement is that there will be n number of datacollectors that need to be fired at once. Each of these data collectors will hit a boundary layer (call this the service layer) and retrieve data. However, since this is within the same request (WCF), if 2 data collectors need to invoke the same method on the service layer, I don't want the second request to wait for the first one to complete.

这需要对构建数据收集器的开发人员透明地构建(使用Unity Interception插入此缓存方面).

This needs to be build transparently to developers building the data collectors (Using Unity Interception to insert this caching aspect).

这是流程的样子.反应性扩展会适合这种设计吗?我过去从未与Rx合作,也不想在开发10天后碰壁.否则,异步,等待和事件的组合在这里也可能会很好地发挥作用.

here is what the flow would look like. Would Reactive extensions be the correct fit for this kind of a design? I haven't worked with Rx in the past, and do not want to hit a brick wall 10 days into development. Otherwise, a combination of async, await and events might also serve well here.

我使用Rx实现了这一点-在多线程上下文中运行良好.有趣的一点是尝试添加而不是tryGet. (这是一个Unity Interception CallHandler)

I implemented this using Rx - works well in a multi threaded context. The interesting bit was to try add instead of tryGet. (This is an Unity Interception CallHandler)

 /// <summary>
    /// Intercepts the calls and tries to retrieve from the cache
    /// </summary>
    class CacheCallHandler : ICallHandler
    {

        [Dependency]
        public ICache RequestCache { get; set; }

        public IMethodReturn Invoke(IMethodInvocation input, GetNextHandlerDelegate getNext)
        {
            IMethodReturn mesg = null;

            string cacheKey = CacheKeyGenerator.GetCacheKey(input);

            //create the task to retrieve the data
            var task = new Task<IMethodReturn>(() =>
            {
                return getNext()(input, getNext);
            });

            //make it observable
            var observableItem = task.ToObservable();

            //try to add it to the cache
            //we need to do this in the order of Add and then try to get, otherwise multiple thread might enter the same area
            if (RequestCache.TryAdd(cacheKey, observableItem))
            {
                //if the add succeeed, it means that we are responsible to starting this task
                task.Start();
            }
            else
            {
                if ( RequestCache.TryGetValue(cacheKey, out observableItem) )
                {
                    //do nothing, the observable item is already updated with the requried reference
                }
                else
                {
                    throw new CacheHandlerException("Could not add to cache AND could not retrieve from cache either. Something's wrong", input);
                }
            }

            //observe the return 
            if ( observableItem != null )
                mesg = observableItem.FirstOrDefault();

            if (mesg == null)
                throw new CacheHandlerException("Not return value found. this should not happen", input);

            return mesg;
        }


        /// <summary>
        /// Should always be the first to execute on the boundary
        /// </summary>
        public int Order
        {
            get { return 1; }
            set { ; }
        }
    }

推荐答案

是的,Rx非常适合此操作.

Yes, Rx is an excellent fit for this.

我建议您考虑实现以下字典来支持密钥缓存:

I suggest you look at implementing the following dictionary to back your Key Cache:

Dictionary<K, AsyncSubject<V>>

您的异步获取数据部分只需要订阅主题以将其填充到结果中即可.

Your fetch data asynchronously part just needs to subscribe to the subject to populate it with the results.

这篇关于使用响应式扩展的并行缓存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆