通用code处理Azure的表并发冲突? [英] Generic code for handling Azure Tables concurrency conflicts?

查看:155
本文介绍了通用code处理Azure的表并发冲突?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我期待在做一些更新到Azure存储表。我想正确使用乐观并发机制。好像你需要做的是这样的:


  1. 加载行更新,可能重试失败

  2. 更新应用到行

  3. 保存一行,可能重试网络错误

    1. 如果有一个并发冲突,重新装载数据(可能重试失败的),并试图再次保存(可能重试失败)


有处理这一一些通用类或code样?我可以code它,但我不得不想象有人已经发明了这个特殊的轮子。


解决方案

如果有人发明了这个轮他们不说话,等我走了,(重新)发明了它自己。这是故意非常通用的,更多的是骨架比成品。它基本上只是我上面提到的算法。调用者在代表做数据的实际加载,更新和保存线。有建于基本的重试逻辑,但我会建议重写这些功能的东西更加强劲。

我相信这将表或BLOB的,单实体或批工作,虽然我只实际上单一实体表更新尝试过。

任何意见,建议,改进等将AP preciated。

使用系统;
使用System.Collections.Generic;
使用System.Linq的;
使用System.Text;
使用的System.Threading;
使用System.Data.Services.Client;
使用Microsoft.WindowsAzure.StorageClient;
使用System.Net;命名空间SepiaLabs.Azure
{
    ///<总结>
    ///尝试同时使用开放式并发到更新写入存储。
    ///实现一个基本的状态机。数据将获取(含重试),然后突变,然后更新(有重试,并有可能重新获取和放大器; remutating)。
    ///客户可以通过与相关信息的状态对象。例如,一个TableServiceContext对象。
    ///< /总结>
    ///<&言论GT;
    ///这个对象本身所实现的一个非常基本的重试策略。
    ///客户可能要继承它,并覆盖ShouldRetryRetrieval()和ShouldRetryPersist()函数来实现更高级的重试策略。
    ///
    ///这个类特意避免检查,如果该行是更新前present。这让来电者可以抛出自定义异常,或试图插入行,而不是(UPSERT的风格互动)
    ///< /言论>
    ///< typeparam NAME =行类型>数据的类型将被读取和更新。虽然它被称为ROWTYPE为清楚起见,你可以操纵行的集合< / typeparam>
    ///< typeparam NAME =StateObjectType>该类型用户提供的状态对象&LT的; / typeparam>
    公共类AzureDataUpdate< ROWTYPE,StateObjectType>
        其中,行类型:类
    {
        ///<总结>
        ///函数来检索将被更新的数据。
        ///这个函数会被调用至少一次。它也将被称为并发更新冲突发生的任何时间。
        ///< /总结>
        公共委托ROWTYPE DataRetriever(StateObjectType stateObj);        ///<总结>
        ///功能应用到数据所需的更改。
        ///这将在每次DataRetriever函数被调用时后调用。
        ///如果您使用的是带有MergeOption。preserveChanges设置TableServiceContext,该功能可以是一个空操作的第一个电话后
        ///< /总结>
        公共委托无效DataMutator(ROWTYPE数据,StateObjectType stateObj);        ///<总结>
        ///功能坚持修改后的数据。该可称为多次。
        ///< /总结>
        ///< PARAM NAME =数据>< /参数>
        ///< PARAM NAME =stateObj>< /参数>
        公共委托无效DataPersister(ROWTYPE数据,StateObjectType stateObj);        公共DataRetriever RetrieverFunction {搞定;组; }
        公共DataMutator MutatorFunction {搞定;组; }
        公共DataPersister PersisterFunction {搞定;组; }        公共AzureDataUpdate()
        {
        }        公共AzureDataUpdate(DataRetriever retrievalFunc,DataMutator mutatorFunc,DataPersister persisterFunc)
        {
            this.RetrieverFunction = retrievalFunc;
            this.MutatorFunction = mutatorFunc;
            this.PersisterFunction = persisterFunc;
        }        公共ROWTYPE执行(StateObjectType userState)
        {
            如果(RetrieverFunction == NULL)
            {
                抛出新的InvalidOperationException异常(必须提供在执行前数据检索功能);
            }
            否则,如果(MutatorFunction == NULL)
            {
                抛出新的InvalidOperationException异常(必须提供在执行前一个数据增变函数);
            }
            否则,如果(PersisterFunction == NULL)
            {
                抛出新的InvalidOperationException异常(必须提供在执行前一个数据的持留函数);
            }            //检索和修改数据
            ROWTYPE数据= this.DoRetrieve(userState);            //调用函数突变。
            MutatorFunction(数据,userState);            //保存变更
            INT attemptNumber = 1;
            而(真)
            {
                布尔是preconditionFailedResponse = FALSE;                尝试
                {
                    PersisterFunction(数据,userState);
                    返回的数据; //返回突变数据
                }
                赶上(DataServiceRequestException dsre)
                {
                    DataServiceResponse RESP = dsre.Response;                    INT状态code = -1;
                    如果(resp.IsBatchResponse)
                    {
                        状态code = resp.BatchStatus code;
                    }
                    否则如果(resp.Any())
                    {
                        状态code = resp.Fi​​rst()状态code。
                    }                    为preconditionFailedResponse =(状态code ==(INT)的HTTPStatus code preconditionFailed);
                    如果(!ShouldRetryPersist(attemptNumber,dsre是preconditionFailedResponse,userState))
                    {
                        扔;
                    }
                }
                赶上(DataServiceClientException dsce)
                {
                    为preconditionFailedResponse =(dsce.Status code ==(INT)的HTTPStatus code preconditionFailed);
                    如果(!ShouldRetryPersist(attemptNumber,dsce是preconditionFailedResponse,userState))
                    {
                        扔;
                    }
                }
                赶上(StorageClientException SCE)
                {
                    为preconditionFailedResponse =(sce.Status code ==的HTTPStatus code preconditionFailed);
                    如果(!ShouldRetryPersist(attemptNumber,SCE,是preconditionFailedResponse,userState))
                    {
                        扔;
                    }
                }
                赶上(异常前)
                {
                    如果(!ShouldRetryPersist(attemptNumber,当然,假的,userState))
                    {
                        扔;
                    }
                }                如果(是preconditionFailedResponse)
                {
                    //重新访存数据,重新申请增变
                    数据= DoRetrieve(userState);
                    MutatorFunction(数据,userState);
                }                attemptNumber ++;
            }
        }        ///<总结>
        ///检索数据进行更新,可能与试
        ///< /总结>
        ///< PARAM NAME =userState>作为这个操作与LT的UserState; /参数>
        私人ROWTYPE DoRetrieve(StateObjectType userState)
        {
            INT attemptNumber = 1;            而(真)
            {
                尝试
                {
                    返回RetrieverFunction(userState);
                }
                赶上(异常前)
                {
                    如果(!ShouldRetryRetrieval(attemptNumber,恩,userState))
                    {
                        扔;
                    }
                }                attemptNumber ++;
            }
        }        ///<总结>
        ///确定数据检索是否应该重试。
        ///实现一个简单的,不变等待时间的策略。用户可以覆盖,以提供一个更复杂的实施。
        ///< /总结>
        ///< PARAM NAME =attemptNumber>什么号码的尝试是这样的。 < /参数>
        ///< PARAM NAME =EX>被抓获&LT除外; /参数>
        ///< PARAM NAME =userState>作为这个操作与LT用户提供的状态对象; /参数>
        ///<退货和GT;真到再次尝试检索,假中止检索和失败更新尝试< /回报>
        受保护的虚拟BOOL ShouldRetryRetrieval(INT attemptNumber,异常前,StateObjectType userState)
        {
            //简单,基本重试策略 - 尝试3次,睡眠,每次1000msec
            如果(attemptNumber 3;)
            {
                Thread.sleep代码(1000);
                返回true;
            }
            其他
            {
                返回false;
            }
        }        ///<总结>
        ///确定数据更新是否应该重试。如果< paramref名=是preconditionFailed/> param是真实的,
        ///然后检索和突变过程将重复以及
        ///实现一个简单的,不变等待时间的策略。用户可以覆盖,以提供一个更复杂的实施。
        ///< /总结>
        ///< PARAM NAME =attemptNumber>什么号码的尝试是这样的。 < /参数>
        ///< PARAM NAME =EX>被抓获&LT除外; /参数>
        ///< PARAM NAME =userState>作为这个操作与LT用户提供的状态对象; /参数>
        ///< PARAM NAME =是preconditionFailedResponse>指示异常是否是preconditionFailed响应。也就是说,乐观并发失败< /参数>
        ///<退货和GT;真到再次尝试更新,虚假中止检索和失败更新尝试< /回报>
        受保护的虚拟BOOL ShouldRetryPersist(INT attemptNumber,异常前,布尔是preconditionFailedResponse,StateObjectType userState)
        {
            如果(是preconditionFailedResponse)
            {
                返回true; //立即重试
            }
            其他
            {
                //其他故障,等待重试
                //简单,基本重试策略 - 尝试3次,睡眠,每次1000msec
                如果(attemptNumber 3;)
                {
                    Thread.sleep代码(1000);
                    返回true;
                }
                其他
                {
                    返回false;
                }
            }
        }
    }
}

I'm looking at doing some updates into Azure Storage Tables. I want to use the optimistic concurrency mechanism properly. It seems like you'd need to do something like:

  1. Load row to update, possibly retrying failures
  2. Apply updates to row
  3. Save row, possibly retrying network errors

    1. If there is a concurrency conflict, reload the data (possibly retrying failures) and attempt to save again (possible retrying failures)

Is there some generic class or code sample that handles this? I can code it up, but I have to imagine someone has already invented this particular wheel.

解决方案

If someone invented this wheel they're not talking, so I went off and (re)invented it myself. This is intentionally very generic, more of a skeleton than a finished product. It's basically just the algorithm I outlined above. The caller has to wire in delegates to do the actual loading, updating and saving of the data. There is basic retry logic built in, but I would recommend overriding those functions with something more robust.

I believe this will work with tables or BLOBs, and single entities or batches, though I've only actually tried it with single-entity table updates.

Any comments, suggestions, improvements, etc would be appreciated.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Data.Services.Client;
using Microsoft.WindowsAzure.StorageClient;
using System.Net;

namespace SepiaLabs.Azure
{
    /// <summary>
    /// Attempt to write an update to storage while using optimistic concurrency.
    /// Implements a basic state machine. Data will be fetched (with retries), then mutated, then updated (with retries, and possibly refetching & remutating). 
    /// Clients may pass in a state object with relevant information. eg, a TableServiceContext object.
    /// </summary>
    /// <remarks>
    /// This object natively implements a very basic retry strategy. 
    /// Clients may want to subclass it and override the ShouldRetryRetrieval() and ShouldRetryPersist() functions to implement more advanced retry strategies. 
    /// 
    /// This class intentionally avoids checking if the row is present before updating it. This is so callers may throw custom exceptions, or attempt to insert the row instead ("upsert" style interaction)
    /// </remarks>
    /// <typeparam name="RowType">The type of data that will be read and updated. Though it is called RowType for clarity, you could manipulate a collection of rows.</typeparam>
    /// <typeparam name="StateObjectType">The type of the user-supplied state object</typeparam>
    public class AzureDataUpdate<RowType, StateObjectType>
        where RowType : class
    {
        /// <summary>
        /// Function to retrieve the data that will be updated. 
        /// This function will be called at least once. It will also be called any time a concurrency update conflict occurs. 
        /// </summary>
        public delegate RowType DataRetriever(StateObjectType stateObj);

        /// <summary>
        /// Function to apply the desired changes to the data.
        /// This will be called after each time the DataRetriever function is called. 
        /// If you are using a TableServiceContext with MergeOption.PreserveChanges set, this function can be a no-op after the first call
        /// </summary>
        public delegate void DataMutator(RowType data, StateObjectType stateObj);

        /// <summary>
        /// Function to persist the modified data. The may be called multiple times. 
        /// </summary>
        /// <param name="data"></param>
        /// <param name="stateObj"></param>
        public delegate void DataPersister(RowType data, StateObjectType stateObj);

        public DataRetriever RetrieverFunction { get; set; }
        public DataMutator MutatorFunction { get; set; }
        public DataPersister PersisterFunction { get; set; }

        public AzureDataUpdate()
        {
        }

        public AzureDataUpdate(DataRetriever retrievalFunc, DataMutator mutatorFunc, DataPersister persisterFunc)
        {
            this.RetrieverFunction = retrievalFunc;
            this.MutatorFunction = mutatorFunc;
            this.PersisterFunction = persisterFunc;
        }

        public RowType Execute(StateObjectType userState)
        {
            if (RetrieverFunction == null)
            {
                throw new InvalidOperationException("Must provide a data retriever function before executing");
            }
            else if (MutatorFunction == null)
            {
                throw new InvalidOperationException("Must provide a data mutator function before executing");
            }
            else if (PersisterFunction == null)
            {
                throw new InvalidOperationException("Must provide a data persister function before executing");
            }

            //Retrieve and modify data
            RowType data = this.DoRetrieve(userState);

            //Call the mutator function. 
            MutatorFunction(data, userState);

            //persist changes
            int attemptNumber = 1;
            while (true)
            {
                bool isPreconditionFailedResponse = false;

                try
                {
                    PersisterFunction(data, userState);
                    return data; //return the mutated data
                }
                catch (DataServiceRequestException dsre)
                {
                    DataServiceResponse resp = dsre.Response;

                    int statusCode = -1;
                    if (resp.IsBatchResponse)
                    {
                        statusCode = resp.BatchStatusCode;
                    }
                    else if (resp.Any())
                    {
                        statusCode = resp.First().StatusCode;
                    }

                    isPreconditionFailedResponse = (statusCode == (int)HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, dsre, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (DataServiceClientException dsce)
                {
                    isPreconditionFailedResponse = (dsce.StatusCode == (int)HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, dsce, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (StorageClientException sce)
                {
                    isPreconditionFailedResponse = (sce.StatusCode == HttpStatusCode.PreconditionFailed);
                    if (!ShouldRetryPersist(attemptNumber, sce, isPreconditionFailedResponse, userState))
                    {
                        throw;
                    }
                }
                catch (Exception ex)
                {
                    if (!ShouldRetryPersist(attemptNumber, ex, false, userState))
                    {
                        throw;
                    }
                }

                if (isPreconditionFailedResponse)
                {
                    //Refetch the data, re-apply the mutator
                    data = DoRetrieve(userState);
                    MutatorFunction(data, userState);
                }

                attemptNumber++;
            }
        }

        /// <summary>
        /// Retrieve the data to be updated, possibly with retries
        /// </summary>
        /// <param name="userState">The UserState for this operation</param>
        private RowType DoRetrieve(StateObjectType userState)
        {
            int attemptNumber = 1;

            while (true)
            {
                try
                {
                    return RetrieverFunction(userState);
                }
                catch (Exception ex)
                {
                    if (!ShouldRetryRetrieval(attemptNumber, ex, userState))
                    {
                        throw;
                    }
                }

                attemptNumber++;
            }
        }

        /// <summary>
        /// Determine whether a data retrieval should be retried. 
        /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation.
        /// </summary>
        /// <param name="attemptNumber">What number attempt is this. </param>
        /// <param name="ex">The exception that was caught</param>
        /// <param name="userState">The user-supplied state object for this operation</param>
        /// <returns>True to attempt the retrieval again, false to abort the retrieval and fail the update attempt</returns>
        protected virtual bool ShouldRetryRetrieval(int attemptNumber, Exception ex, StateObjectType userState)
        {
            //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time
            if (attemptNumber < 3)
            {
                Thread.Sleep(1000); 
                return true;
            }
            else
            {
                return false;
            }
        }

        /// <summary>
        /// Determine whether a data update should be retried. If the <paramref name="isPreconditionFailed"/> param is true, 
        /// then the retrieval and mutation process will be repeated as well
        /// Implements a simplistic, constant wait time strategy. Users may override to provide a more complex implementation.
        /// </summary>
        /// <param name="attemptNumber">What number attempt is this. </param>
        /// <param name="ex">The exception that was caught</param>
        /// <param name="userState">The user-supplied state object for this operation</param>
        /// <param name="isPreconditionFailedResponse">Indicates whether the exception is a PreconditionFailed response. ie, an optimistic concurrency failure</param>
        /// <returns>True to attempt the update again, false to abort the retrieval and fail the update attempt</returns>
        protected virtual bool ShouldRetryPersist(int attemptNumber, Exception ex, bool isPreconditionFailedResponse, StateObjectType userState)
        {
            if (isPreconditionFailedResponse)
            {
                return true; //retry immediately
            }
            else
            {
                //For other failures, wait to retry
                //Simple, basic retry strategy - try 3 times, sleep for 1000msec each time
                if (attemptNumber < 3)
                {
                    Thread.Sleep(1000);
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }
    }
}

这篇关于通用code处理Azure的表并发冲突?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆