使用BookSleeve的ConnectionUtils.Connect()将SignalR与Redis消息总线故障转移配合使用 [英] Using SignalR with Redis messagebus failover using BookSleeve's ConnectionUtils.Connect()

查看:124
本文介绍了使用BookSleeve的ConnectionUtils.Connect()将SignalR与Redis消息总线故障转移配合使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用SignalR应用程序创建Redis消息总线故障转移方案.

I am trying to create a Redis message bus failover scenario with a SignalR app.

首先,我们尝试了一个简单的硬件负载平衡器故障转移,该故障转移仅监视两个Redis服务器. SignalR应用程序指向单个HLB端点.然后,我使一台服务器发生故障,但是在不回收SignalR应用程序池的情况下无法成功通过第二台Redis服务器获得任何消息.大概是因为它需要向新的Redis消息总线发出安装命令.

At first, we tried a simple hardware load-balancer failover, that simply monitored two Redis servers. The SignalR application pointed to the singular HLB endpoint. I then failed one server, but was unable to successfully get any messages through on the second Redis server without recycling the SignalR app pool. Presumably this is because it needs to issue the setup commands to the new Redis message bus.

从SignalR RC1开始,Microsoft.AspNet.SignalR.Redis.RedisMessageBus使用Booksleeve的RedisConnection()连接到用于发布/订阅的单个Redis.

As of SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBus uses Booksleeve's RedisConnection() to connect to a single Redis for pub/sub.

我创建了一个新类RedisMessageBusCluster(),该类使用Booksleeve的ConnectionUtils.Connect()连接到Redis服务器集群中的一个.

I created a new class, RedisMessageBusCluster() that uses Booksleeve's ConnectionUtils.Connect() to connect to one in a cluster of Redis servers.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Redis
{
    /// <summary>
    /// WIP:  Getting scaleout for Redis working
    /// </summary>
    public class RedisMessageBusCluster : ScaleoutMessageBus
    {
        private readonly int _db;
        private readonly string[] _keys;
        private RedisConnection _connection;
        private RedisSubscriberConnection _channel;
        private Task _connectTask;

        private readonly TaskQueue _publishQueue = new TaskQueue();

        public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
            : base(resolver)
        {
            _db = db;
            _keys = keys.ToArray();

            // uses a list of connections
            _connection = ConnectionUtils.Connect(serverList);

            //_connection = new RedisConnection(host: server, port: port, password: password);

            _connection.Closed += OnConnectionClosed;
            _connection.Error += OnConnectionError;


            // Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
            _connectTask = _connection.Open().Then(() =>
            {
                // Create a subscription channel in redis
                _channel = _connection.GetOpenSubscriberChannel();

                // Subscribe to the registered connections
                _channel.Subscribe(_keys, OnMessage);

                // Dirty hack but it seems like subscribe returns before the actual
                // subscription is properly setup in some cases
                while (_channel.SubscriptionCount == 0)
                {
                    Thread.Sleep(500);
                }
            });
        }


        protected override Task Send(Message[] messages)
        {
            return _connectTask.Then(msgs =>
            {
                var taskCompletionSource = new TaskCompletionSource<object>();

                // Group messages by source (connection id)
                var messagesBySource = msgs.GroupBy(m => m.Source);

                SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);

                return taskCompletionSource.Task;
            },
            messages);
        }

        private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
        {
            if (!enumerator.MoveNext())
            {
                taskCompletionSource.TrySetResult(null);
            }
            else
            {
                IGrouping<string, Message> group = enumerator.Current;

                // Get the channel index we're going to use for this message
                int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;

                string key = _keys[index];

                // Increment the channel number
                _connection.Strings.Increment(_db, key)
                                   .Then((id, k) =>
                                   {
                                       var message = new RedisMessage(id, group.ToArray());

                                       return _connection.Publish(k, message.GetBytes());
                                   }, key)
                                   .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
                                   .ContinueWithNotComplete(taskCompletionSource);
            }
        }

        private void OnConnectionClosed(object sender, EventArgs e)
        {
            // Should we auto reconnect?
            if (true)
            {
                ;
            }
        }

        private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
        {
            // How do we bubble errors?
            if (true)
            {
                ;
            }
        }

        private void OnMessage(string key, byte[] data)
        {
            // The key is the stream id (channel)
            var message = RedisMessage.Deserialize(data);

            _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_channel != null)
                {
                    _channel.Unsubscribe(_keys);
                    _channel.Close(abort: true);
                }

                if (_connection != null)
                {
                    _connection.Close(abort: true);
                }                
            }

            base.Dispose(disposing);
        }
    }
}

Booksleeve具有确定主服务器的机制,它会自动故障转移到另一台服务器,现在正在使用SignalR.Chat对其进行测试.

Booksleeve has its own mechanism for determining a master, and will automatically fail over to another server, and am now testing this with SignalR.Chat.

web.config中,我设置了可用服务器的列表:

In web.config, I set the list of available servers:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

然后在Application_Start()中:

        // Redis cluster server list
        string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];

        List<string> eventKeys = new List<string>();
        eventKeys.Add("SignalR.Redis.FailoverTest");
        GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

我在Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions中添加了两种其他方法:

I added two additional methods to Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
    resolver.Register(typeof(IMessageBus), () => bus.Value);

    return resolver;
}

现在的问题是,当我启用了多个断点时,直到添加用户名之后,然后禁用所有断点,应用程序才能按预期工作.但是,从一开始就禁用断点,在连接过程中似乎有些竞争条件可能会失败.

Now the problem is that when I have several breakpoints enabled, until after a user name has been added, then disable all breakpoints, the application works as expected. However, with the breakpoints disabled from the beginning, there seems to be some race condition that may be failing during the connection process.

因此,在RedisMessageCluster()中:

    // Start the connection
    _connectTask = _connection.Open().Then(() =>
    {
        // Create a subscription channel in redis
        _channel = _connection.GetOpenSubscriberChannel();

        // Subscribe to the registered connections
        _channel.Subscribe(_keys, OnMessage);

        // Dirty hack but it seems like subscribe returns before the actual
        // subscription is properly setup in some cases
        while (_channel.SubscriptionCount == 0)
        {
            Thread.Sleep(500);
        }
    });

我尝试同时添加一个Task.Wait,甚至一个附加的Sleep()(上面未显示)-它们正在等待等,但仍然出现错误.

I tried adding both a Task.Wait, and even an additional Sleep() (not shown above) - which were waiting/etc, but still getting errors.

重复出现的错误似乎在Booksleeve.MessageQueue.cs〜ln 71中:

The recurring error seems to be in Booksleeve.MessageQueue.cs ~ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---



public void Enqueue(RedisMessage item, bool highPri)
{
    lock (stdPriority)
    {
        if (closed)
        {
            throw new InvalidOperationException("The queue is closed");
        }

正在引发封闭队列异常的地方.

Where a closed queue exception is being thrown.

我预见到另一个问题:由于Redis连接是在Application_Start()中建立的,因此在重新连接"到另一台服务器时可能会出现一些问题.但是,我认为使用单数RedisConnection()时这是有效的,其中只有一个连接可供选择.但是,根据ConnectionUtils.Connect()的介绍,我想听听@dfowler或其他SignalR专家的话,如何在SignalR中处理这种情况.

I foresee another issue: Since the Redis connection is made in Application_Start() there may be some issues in "reconnecting" to another server. However, I think this is valid when using the singular RedisConnection(), where there is only one connection to choose from. However, with the intorduction of ConnectionUtils.Connect() I'd like to hear from @dfowler or the other SignalR guys in how this scenario is handled in SignalR.

推荐答案

SignalR团队现已通过 StackExchange.Redis ,BookSleeve的后继者,它通过ConnectionMultiplexer支持冗余的Redis连接.

The SignalR team has now implemented support for a custom connection factory with StackExchange.Redis, the successor to BookSleeve, which supports redundant Redis connections via ConnectionMultiplexer.

遇到的最初问题是,尽管在BookSleeve中创建了自己的扩展方法来接受服务器的集合,但仍无法进行故障转移.

The initial problem encountered was that in spite of creating my own extension methods in BookSleeve to accept a collection of servers, fail-over was not possible.

现在,随着BookSleeve向StackExchange.Redis的发展,我们现在可以Connect初始化中直接配置noreferrer>配置服务器/端口的集合.

Now, with the evolution of BookSleeve to StackExchange.Redis, we can now configure collection of servers/ports right in the Connect initialization.

新的实现比创建UseRedisCluster方法要走的路要简单得多,后端管道现在支持真正的故障转移:

The new implementation is much simpler than the road I was going down, in creating a UseRedisCluster method, and the back-end pluming now supports true fail-over:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis还允许按照文档的Automatic and Manual Configuration部分所述进行其他手动配置:

StackExchange.Redis also allows for additional manual configuration as outlined in the Automatic and Manual Configuration section of the documentation:

ConfigurationOptions config = new ConfigurationOptions
{
    EndPoints =
    {
        { "redis0", 6379 },
        { "redis1", 6380 }
    },
    CommandMap = CommandMap.Create(new HashSet<string>
    { // EXCLUDE a few commands
        "INFO", "CONFIG", "CLUSTER",
        "PING", "ECHO", "CLIENT"
    }, available: false),
    KeepAlive = 180,
    DefaultVersion = new Version(2, 8, 8),
    Password = "changeme"
};

实质上,现在可以通过服务器集合来初始化SignalR横向扩展环境了,从而解决了最初的问题.

In essence, the ability to initialize our SignalR scale-out environment with a collection of servers now solves the initial problem.

这篇关于使用BookSleeve的ConnectionUtils.Connect()将SignalR与Redis消息总线故障转移配合使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆