泛型类用于执行大规模并行查询。反馈? [英] Generic class for performing mass-parallel queries. Feedback?

查看:108
本文介绍了泛型类用于执行大规模并行查询。反馈?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不明白为什么,但似乎是在客户端库没有机制可以为Windows Azure表的存储并行执行多个查询。我创建了可以用来节省大量时间模板类,并欢迎您无论你想使用它。我想AP $ P $但是pciate,如果你能挑选它拆开,并就如何改进这一类的反馈。

 公共类AsyncDataQuery< T>其中T:新的()
{
    公共AsyncDataQuery(布尔preserve_order)
    {
        M_ preserve_order = preserve_order;
        this.Queries =新的List< CloudTableQuery< T>>(1000);
    }    公共无效的AddQuery(IQueryable的< T>查询)
    {
        VAR data_query =(化DataServiceQuery< T>)查询;
        VAR URI = data_query.RequestUri; //要求        this.Queries.Add(新CloudTableQuery< T>(data_query));
    }    ///<总结>
    ///阻断,但仍然进行了优化。
    ///< /总结>
    公开名单< T>执行()
    {
        this.BeginAsync();
        返回this.EndAsync();
    }    公共无效BeginAsync()
    {
        如果(M_ preserve_order == true)而
        {
            this.Items =新的List< T>(Queries.Count);
            对于(VAR I = 0; I< Queries.Count;我++)
            {
                this.Items.Add(新T());
            }
        }
        其他
        {
            this.Items =新的List< T>(Queries.Count * 2);
        }        M_WAIT =新的ManualResetEvent(假);        对于(VAR I = 0; I< Queries.Count;我++)
        {
            VAR的查询=查询[I]
            query.BeginExecuteSegmented(回调,I);
        }
    }    公开名单< T> EndAsync()
    {
        m_wait.WaitOne();
        m_wait.Dispose();        返回this.Items;
    }    私人列表< T>项目{搞定;组; }
    私人列表< CloudTableQuery< T>>查询{搞定;组; }    私人布尔M_ preserve_order;
    私人的ManualResetEvent M_WAIT;
    私人INT m_completed = 0;
    私有对象m_lock =新的对象();    私人无效回调(IAsyncResult的AR)
    {
        INT I =(INT)ar.AsyncState;
        CloudTableQuery< T>查询=查询[I]
        VAR响应= query.EndExecuteSegmented(AR);
        如果(M_ preserve_order == true)而
        {// preserve排序仅支持每个查询的一个结果
            锁(m_lock)
            {
                this.Items [I] = response.Results.Single();
            }
        }
        其他
        {//添加任何数量的项目
            锁(m_lock)
            {
                this.Items.AddRange(response.Results);
            }
        }
        如果(response.HasMoreResults ==真)
        {//更多的数据来拉
            query.BeginExecuteSegmented(response.ContinuationToken,回调,I);
            返回;
        }
        m_completed = Interlocked.Increment(REF m_completed);
        如果(m_completed == Queries.Count)
        {
            m_wait.Set();
        }
    }
}


解决方案

猜猜我迟到了。我想补充两点:


  1. ManualResetEvent的是IDisposable接口。所以,你需要确保它得到的地方处理。

  2. 错误处理 - 如果查询之一失败,可能会失败,整个事情。你或许应该重试失败的请求。另外,您可以回到你没有找回一些指示,其中查询请求失败,这样,来电者可以重试查询的值。

  3. 客户端超时 - 有没有。这不是一个问题,如果在服务器侧超时你,但是如果没有成功(例如,网络问题),客户端将永远挂起。

另外,我觉得这其实是一个更好的办法,任务并行库。我试过了任务,每个查询的方法在此之前。在code竟是更加尴尬,并倾向于造成具有活性的线程的很多的。我还没有与code广泛的测试,但它似乎在乍一看更好。

更新

我已经把一些工作纳入上述code的更多或更少的重写。我重写删除所有锁定,支持雄交易客户端超时(罕见,但它确实发生了,而且真的可以毁掉你的一天),以及一些异常处理逻辑。有一个与测试一个完整的解决方案了在到位桶。最相关的code住在<一个href=\"https://bitbucket.org/breischl/sepialabs.azure/src/0f92178ab98e/Sepialabs.Azure/ParallelTableQuery.cs\"相对=nofollow>一个文件,但它确实需要一些助手是该项目的其他部分。

I don't understand why, but there appears to be no mechanism in the client library for performing many queries in parallel for Windows Azure Table Storage. I've created a template class that can be used to save considerable time, and you're welcome to use it however you wish. I would appreciate however, if you could pick it apart, and provide feedback on how to improve this class.

public class AsyncDataQuery<T> where T: new()
{
    public AsyncDataQuery(bool preserve_order)
    {
        m_preserve_order = preserve_order;
        this.Queries = new List<CloudTableQuery<T>>(1000);
    }

    public void AddQuery(IQueryable<T> query)
    {
        var data_query = (DataServiceQuery<T>)query;
        var uri = data_query.RequestUri; // required

        this.Queries.Add(new CloudTableQuery<T>(data_query));
    }

    /// <summary>
    /// Blocking but still optimized.
    /// </summary>
    public List<T> Execute()
    {
        this.BeginAsync();
        return this.EndAsync();
    }

    public void BeginAsync()
    {
        if (m_preserve_order == true)
        {
            this.Items = new List<T>(Queries.Count);
            for (var i = 0; i < Queries.Count; i++)
            {
                this.Items.Add(new T());
            }
        }
        else
        {
            this.Items = new List<T>(Queries.Count * 2);
        }

        m_wait = new ManualResetEvent(false);

        for (var i = 0; i < Queries.Count; i++)
        {
            var query = Queries[i];
            query.BeginExecuteSegmented(callback, i);
        }
    }

    public List<T> EndAsync()
    {
        m_wait.WaitOne();
        m_wait.Dispose();

        return this.Items;
    }

    private List<T> Items { get; set; }
    private List<CloudTableQuery<T>> Queries { get; set; }

    private bool m_preserve_order;
    private ManualResetEvent m_wait;
    private int m_completed = 0;
    private object m_lock = new object();

    private void callback(IAsyncResult ar)
    {
        int i = (int)ar.AsyncState;
        CloudTableQuery<T> query = Queries[i];
        var response = query.EndExecuteSegmented(ar);
        if (m_preserve_order == true)
        { // preserve ordering only supports one result per query
            lock (m_lock)
            {
                this.Items[i] = response.Results.Single();
            }
        }
        else
        { // add any number of items
            lock (m_lock)
            {
                this.Items.AddRange(response.Results);
            }
        }
        if (response.HasMoreResults == true)
        { // more data to pull
            query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
            return;
        }
        m_completed = Interlocked.Increment(ref m_completed);
        if (m_completed == Queries.Count)
        {
            m_wait.Set();
        }
    }
}

解决方案

Guess I'm late to the party. I would add two things:

  1. ManualResetEvent is IDisposable. So you need to make sure it gets disposed somewhere.
  2. Error handling - if one of the queries fails it'll probably fail the whole thing. You should probably retry failed requests. Alternatively you could return the values you did get back with some indication of which queries failed, so that the caller could retry the queries.
  3. Client side timeouts - there are none. This isn't a problem if the server side times out for you, but if that ever fails (eg, network issues) the client will hang forever.

Also, I think this is actually a better approach that the Task Parallel Library. I tried the Task-per-query approach before this. The code was actually more awkward, and it tended to result in having a lot of active threads. I still haven't tested extensively with your code, but it seems to work better on first blush.

Update

I've put some work into a more-or-less rewrite of the code above. My rewrite removes all locking, supports client-side timeouts of hung transactions (rare, but it does happen, and can really ruin your day), and some exception handling logic. There is a full solution with tests up on Bitbucket. The most relevant code lives in one file, though it does require some helpers that are in other parts of the project.

这篇关于泛型类用于执行大规模并行查询。反馈?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆