协议错误,“没有更多数据";错误,“零长度响应"在大量情况下使用servicestack.redis时出错 [英] Protocol errors, "no more data" errors, "Zero length response" errors while using servicestack.redis in a high volume scenario

查看:1067
本文介绍了协议错误,“没有更多数据";错误,“零长度响应"在大量情况下使用servicestack.redis时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果有人告诉我在大容量情况下PooledRedisClientManager是否存在问题,真的有帮助吗?

Would really help if someone tell me if there are issues with PooledRedisClientManager under high volume scenarios?

我正在使用一个单例客户端管理器,该管理器在一分钟内被多个WCF线程调用GetClient()多达1000次,并且每个线程都可以读取/更新/插入Redis集合(使用Redis哈希集合).

I am using a singleton client manager that gets called for GetClient() by multiple WCF threads 1000's of times in a min and each thread can read/update/insert into a Redis collection (am using redis hash collection).

我间歇性地看到这些错误,通常会在Retries上消失.

Intermittently i see these errors and usually go away on Retries.

所有GetClient()调用都在Using语句内.

All GetClient() calls are within Using statements.

谢谢 Rb

这是我从日志中看到的错误: 错误1

Here are the errors I see from the logs: Error 1

ServiceStack.Redis.RedisResponseException: Unknown reply on integer response:         123"Key":"7c3699524bcc457ab377ad1af17eb046","Value":"9527cb78-2e32-4695-ad33-7991f92eb3a2"}, sPort: 64493, LastCommand: HEXISTS urn:xxxxxxxxxxxxxxxx "118fdc26117244819eb712a82b8e86fd"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.HashContainsEntry(String hashId, String key)
at ServiceStack.Redis.Generic.RedisTypedClient`1.HashContainsEntry[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.ContainsKey(TKey key)

Error 2
ServiceStack.Redis.RedisResponseException: No more data, sPort: 65005, LastCommand: HSET urn:xxxxxxxxxxxxxxxxx "9ced6120a876405faccf5cb043e70807" {"ID":"9ced6120a87...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)

错误3

ServiceStack.Redis.RedisResponseException: Protocol error: expected '$', got ' ', sPort: 64993, LastCommand: HGET urn:xxxxxxxxxxxxxxxxxxxx "705befa18af74f61aafff50b4282de19"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ParseSingleLine(String r)
at ServiceStack.Redis.Generic.RedisTypedClient`1.GetValueFromHash[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.get_Item(TKey key)

Error 4

ServiceStack.Redis.RedisResponseException: Protocol error: invalid multibulk length, sPort: 65154, LastCommand: HSET urn:xxxxxxxxxxxxxx "39a5023eee374b28acbe5f63561c6211" {"ID":"39a5023eee3...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)

代码:

基本上,我围绕RedisHash创建了一个包装RedisCacheCollection ...这是为了支持使用.net列表和字典的现有代码.

Basically I created a wrapper RedisCacheCollection around RedisHash...this is to support existing code that was using .net Lists and Dictionaries.

   public class RedisCachedCollection<TKey, TValue> : CacheCollectionBase<TKey, TValue>, IEnumerable<TValue>
  {
    private string _collectionKey;
    private string _collectionLock;
    private IRedisTypedClient<TValue> _redisTypedClient = null;
    private int _locktimeout;
    private Func<TValue, TKey> _idAction;

    public RedisCachedCollection(string collectionKey, int locktimeoutsecs = 5)
    {
        _collectionKey = string.Format("urn:{0}:{1}", "XXXXX", collectionKey);
        _collectionLock = string.Format("{0}+lock", _collectionKey);
        _locktimeout = locktimeoutsecs;
    }


    private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
    {
        _redisTypedClient = redis.As<TValue>();
        return _redisTypedClient.GetHash<TKey>(_collectionKey);
    }
    public override void Add(TValue obj)
    {
        TKey Id = GetUniqueIdAction(obj);

        RetryAction((redis) =>
        {
            GetCollection(redis).Add(Id, obj);
        });
    }

    public override bool Remove(TValue obj)
    {
        TKey Id = GetUniqueIdAction(obj);
        TKey defaultv = default(TKey);

        return RetryAction<bool>((redis) =>
        {
            if (!Id.Equals(defaultv))
            {
                {
                    return GetCollection(redis).Remove(Id);
                }
            }
            return false;
        });

    }

    public override TValue this[TKey id]
    {
        get
        {
            return RetryAction<TValue>((redis) =>
            {
                if (GetCollection(redis).ContainsKey(id))
                    return GetCollection(redis)[id];
                return default(TValue);
            });                
        }
        set
        {
            RetryAction((redis) =>
            {
                GetCollection(redis)[id] = value;
            });                
        }
    }
    public override int Count
    {
        get
        {
            return RetryAction<int>((redis) =>
            {
                return GetCollection(redis).Count;
            });
        }
    }

    public IEnumerable<TValue> Where(Func<TValue, bool> predicate)
    {
        return RetryAction<IEnumerable<TValue>>((redis) =>
        {
            return GetCollection(redis).Values.Where(predicate);
        });
    }

    public bool Any(Func<TValue, bool> predicate)
    {
        return RetryAction<bool>((redis) =>
        {
            return GetCollection(redis).Values.Any(predicate);
        });
    }


    public override IEnumerator<TValue> GetEnumerator()
    {
        return RetryAction<IEnumerator<TValue>>((redis) =>
        {
            return GetCollection(redis).Values.GetEnumerator();
        });
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return RetryAction<System.Collections.IEnumerator>((redis) =>
        {
            return ((System.Collections.IEnumerable)GetCollection(redis).Values).GetEnumerator();
        });           

    }


    public override void Clear()
    {
        RetryAction((redis) =>
        {
            GetCollection(redis).Clear();
        });
    }

    public override bool Contains(TValue obj)
    {
        TKey Id = GetUniqueIdAction(obj);
        return RetryAction<bool>((redis) =>
        {
            return GetCollection(redis).ContainsKey(Id);
        });
    }

    public override bool ContainsKey(TKey obj)
    {
        return RetryAction<bool>((redis) =>
        {
            return GetCollection(redis).ContainsKey(obj);
        });
    }



    public override void CopyTo(TValue[] array, int arrayIndex)
    {
        RetryAction((redis) =>
        {
            GetCollection(redis).Values.CopyTo(array, arrayIndex);
        });
    }

    public override bool IsReadOnly
    {
        get 
        {
            return RetryAction<bool>((redis) =>
            {
                return GetCollection(redis).IsReadOnly;
            });            
        }
    }

    public override Func<TValue, TKey> GetUniqueIdAction
    {
        get
        {
            return _idAction;
        }
        set
        {
            _idAction = value;
        }
    }
    private object _synclock = new object();

    public override IDisposable Lock
    {
        get
        {
            lock (_synclock)
            {
                try
                {
                    return new CacheTransaction(_collectionLock, _locktimeout);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    throw;
                }
            }


        }
    }
    private Dictionary<int, IRedisClient> _redisconnectionpool = new Dictionary<int, IRedisClient>();

    public IRedisClient RedisConnection
    {
        get
        {
                return RedisClientManager.Instance.GetClient();
        }
    }
    private void RetryAction(Action<IRedisClient> action)
    {
        int i = 0;

        while (true)
        {
            try
            {
                using (var redis = RedisConnection)
                {
                    action(redis);
                    return;
                }
            }
            catch (Exception ex)
            {

                if (i++ < 3)
                {

                    continue;
                }
                throw;
            }
        }
    }

    private TOut RetryAction<TOut>(Func<IRedisClient, TOut> action)
    {
        int i = 0;

        while (true)
        {
            try
            {
                using (var redis = RedisConnection)
                {
                    TOut result = action(redis);
                    return result;
                }
            }
            catch (Exception ex)
            {

                if (i++ < 3)
                {

                    continue;
                }

                throw;
            }
        }
    }
}

}

推荐答案

我已经已添加使用上面的HashCollection代码进行压力测试,使用它可以编译的尽可能多的代码,并让它在64个线程中同时运行API调用(如上StackTrace所示):

I've added a stress test with your HashCollection code above, using as much of it which I can get to compile and got it to run the API calls (shown in the StackTrace above) concurrently in 64 threads:

clientsManager = new PooledRedisClientManager(ipAddress);
redisCollection = new RedisCachedCollection<string, string>(
    clientsManager, "Thread: " + Thread.CurrentThread.ManagedThreadId);

var StartedAt = DateTime.UtcNow;
Interlocked.Increment(ref running);

"Starting HashCollectionStressTests with {0} threads".Print(noOfThreads);
var threads = new List<Thread>();
for (int i = 0; i < noOfThreads; i++)
{
    threads.Add(new Thread(WorkerLoop));
}
threads.ForEach(t => t.Start());

"Press Enter to Stop...".Print();
Console.ReadLine();

Interlocked.Decrement(ref running);

"Writes: {0}, Reads: {1}".Print(writeCount, readCount);
"{0} EndedAt: {1}".Print(GetType().Name, DateTime.UtcNow.ToLongTimeString());
"{0} TimeTaken: {1}s".Print(GetType().Name,(DateTime.UtcNow-StartedAt).TotalSeconds);

这是WorkerLoop:

Here's the WorkerLoop:

public void WorkerLoop()
{
    while (Interlocked.CompareExchange(ref running, 0, 0) > 0)
    {
        redisCollection.ContainsKey("key");
        Interlocked.Increment(ref readCount);

        redisCollection["key"] = "value " + readCount;
        Interlocked.Increment(ref writeCount);

        var value = redisCollection["key"];
        Interlocked.Increment(ref readCount);

        if (value == null)
            Console.WriteLine("value == null");
    }
}

我还修改了您的RetryAction API,以立即登录并throw,这样我就可以检测到抛出的第一个异常:

I've also modified your RetryAction API to immediately log and throw so I can detect the first exception thrown:

private void RetryAction(Action<IRedisClient> action)
{
    try
    {
        using (var redis = RedisConnection)
        {
            action(redis);
            return;
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
        throw;
    }
}

我已经针对本地和联网的 redis-server 实例运行了此压力测试,但尚未看到异常.在运行了将近40分钟后的最后一次响应是这样的:

I've ran this stress test against a local and networked redis-server instance and have yet to see an Exception. The last time after letting it run for nearly 40 minutes yielded this response:

Starting HashCollectionStressTests with 64 threads
Press Enter to Stop...

Writes: 876755, Reads: 1753518
HashCollectionStressTests EndedAt: 2:10:01 AM
HashCollectionStressTests TimeTaken: 2292.985048s

基本显示它同时执行了 2.6M + 哈希收集API,没有任何异常.

Basically showing it executed 2.6M+ Hash Collection API concurrently without any Exception.

不幸的是,如果无法重现,我无法确定您遇到的问题.我确实感到奇怪的是,您在以下位置保留了_redisTypedClient的非线程安全实例引用:

Unfortunately I can't determine what issue you're running into without being able to reproduce it. I did find it strange that you're keeping a non-thread-safe instance reference of the _redisTypedClient around:

private IRedisTypedClient<TValue> _redisTypedClient = null;

在这里填充的人:

private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
{
    _redisTypedClient = redis.As<TValue>();
    return _redisTypedClient.GetHash<TKey>(_collectionKey);
}

这不是必需的,因为它可以是局部变量.由于提供的代码不完整(即无法编译),因此我不确定该实例是否正在由多个线程中调用的其他API调用使用?

Which isn't necessary as this could've been a local variable. As the code provided was incomplete (i.e. doesn't compile) I'm not sure if this instance is being used by other API calls called in multiple threads?

如果您可以将显示问题的repro放在一起,这将有助于识别问题.一个独立的示例也将有助于了解代码的使用方式.

If you could put together a repro that shows the issue that would help in identifying the issue. A stand-alone example would also help in being able to see how the code is used.

这篇关于协议错误,“没有更多数据";错误,“零长度响应"在大量情况下使用servicestack.redis时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆