使用 SignalR 和使用 BookSleeve 的 ConnectionUtils.Connect()的 Redis messagebus 故障转移

我试图用 SignalR 应用程序创建一个 Redis 消息总线故障转移场景。

首先,我们尝试了一个简单的硬件负载平衡故障转移,它只监视两个 Redis 服务器。SignalR 应用程序指向奇异的 HLB 端点。然后,有一台服务器出现故障,但是如果不重新使用 SignalR 应用程序池,就无法在第二台 Redis 服务器上成功地传递任何消息。这大概是因为它需要向新的 Redis 消息总线发出设置命令。

从 SignalR RC1开始,Microsoft.AspNet.SignalR.Redis.RedisMessageBus使用 Booksley 的 RedisConnection()连接到发布/订阅的单个 Redis。

我创建了一个新类 RedisMessageBusCluster(),它使用 BooksleveConnectionUtils.Connect()连接到 Redis 服务器集群中的一个类。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;


namespace Microsoft.AspNet.SignalR.Redis
{
/// <summary>
/// WIP:  Getting scaleout for Redis working
/// </summary>
public class RedisMessageBusCluster : ScaleoutMessageBus
{
private readonly int _db;
private readonly string[] _keys;
private RedisConnection _connection;
private RedisSubscriberConnection _channel;
private Task _connectTask;


private readonly TaskQueue _publishQueue = new TaskQueue();


public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
: base(resolver)
{
_db = db;
_keys = keys.ToArray();


// uses a list of connections
_connection = ConnectionUtils.Connect(serverList);


//_connection = new RedisConnection(host: server, port: port, password: password);


_connection.Closed += OnConnectionClosed;
_connection.Error += OnConnectionError;




// Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
_connectTask = _connection.Open().Then(() =>
{
// Create a subscription channel in redis
_channel = _connection.GetOpenSubscriberChannel();


// Subscribe to the registered connections
_channel.Subscribe(_keys, OnMessage);


// Dirty hack but it seems like subscribe returns before the actual
// subscription is properly setup in some cases
while (_channel.SubscriptionCount == 0)
{
Thread.Sleep(500);
}
});
}




protected override Task Send(Message[] messages)
{
return _connectTask.Then(msgs =>
{
var taskCompletionSource = new TaskCompletionSource<object>();


// Group messages by source (connection id)
var messagesBySource = msgs.GroupBy(m => m.Source);


SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);


return taskCompletionSource.Task;
},
messages);
}


private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
{
if (!enumerator.MoveNext())
{
taskCompletionSource.TrySetResult(null);
}
else
{
IGrouping<string, Message> group = enumerator.Current;


// Get the channel index we're going to use for this message
int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;


string key = _keys[index];


// Increment the channel number
_connection.Strings.Increment(_db, key)
.Then((id, k) =>
{
var message = new RedisMessage(id, group.ToArray());


return _connection.Publish(k, message.GetBytes());
}, key)
.Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
.ContinueWithNotComplete(taskCompletionSource);
}
}


private void OnConnectionClosed(object sender, EventArgs e)
{
// Should we auto reconnect?
if (true)
{
;
}
}


private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
{
// How do we bubble errors?
if (true)
{
;
}
}


private void OnMessage(string key, byte[] data)
{
// The key is the stream id (channel)
var message = RedisMessage.Deserialize(data);


_publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
}


protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_channel != null)
{
_channel.Unsubscribe(_keys);
_channel.Close(abort: true);
}


if (_connection != null)
{
_connection.Close(abort: true);
}
}


base.Dispose(disposing);
}
}
}

Booksley 有自己的确定主服务器的机制,并将自动故障转移到另一个服务器,现在正在使用 SignalR.Chat对此进行测试。

web.config中,我设置了可用服务器的列表:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

然后在 Application_Start():

        // Redis cluster server list
string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];


List<string> eventKeys = new List<string>();
eventKeys.Add("SignalR.Redis.FailoverTest");
GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

我在 Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions中添加了两个额外的方法:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}


public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
resolver.Register(typeof(IMessageBus), () => bus.Value);


return resolver;
}

现在的问题是,当我启用了多个断点时,直到添加了用户名之后,才禁用所有断点,应用程序将按预期工作。但是,由于从一开始就禁用了断点,因此在连接过程中似乎有一些竞态条件可能会失败。

因此,在 RedisMessageCluster()中:

    // Start the connection
_connectTask = _connection.Open().Then(() =>
{
// Create a subscription channel in redis
_channel = _connection.GetOpenSubscriberChannel();


// Subscribe to the registered connections
_channel.Subscribe(_keys, OnMessage);


// Dirty hack but it seems like subscribe returns before the actual
// subscription is properly setup in some cases
while (_channel.SubscriptionCount == 0)
{
Thread.Sleep(500);
}
});

我尝试同时添加一个 Task.Wait,甚至一个额外的 Sleep()(上面没有显示)-它们正在等待/等,但仍然得到错误。

反复出现的错误似乎出现在 Booksleeve.MessageQueue.cs ~ ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
--- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---






public void Enqueue(RedisMessage item, bool highPri)
{
lock (stdPriority)
{
if (closed)
{
throw new InvalidOperationException("The queue is closed");
}

引发关闭的队列异常。

我预见到另一个问题: 由于 Redis 连接是在 Application_Start()中建立的,所以在“重新连接”到另一个服务器时可能会有一些问题。但是,我认为这在使用单数 RedisConnection()时是有效的,因为只有一个连接可供选择。然而,随着 ConnectionUtils.Connect()的介绍,我希望听到来自 @dfowler或其他 SignalR 家伙在 SignalR 中如何处理这个场景。

8027 次浏览

The SignalR team has now implemented support for a custom connection factory with StackExchange.Redis, the successor to BookSleeve, which supports redundant Redis connections via ConnectionMultiplexer.

The initial problem encountered was that in spite of creating my own extension methods in BookSleeve to accept a collection of servers, fail-over was not possible.

Now, with the evolution of BookSleeve to StackExchange.Redis, we can now configure collection of servers/ports right in the Connect initialization.

The new implementation is much simpler than the road I was going down, in creating a UseRedisCluster method, and the back-end pluming now supports true fail-over:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis also allows for additional manual configuration as outlined in the Automatic and Manual Configuration section of the documentation:

ConfigurationOptions config = new ConfigurationOptions
{
EndPoints =
{
{ "redis0", 6379 },
{ "redis1", 6380 }
},
CommandMap = CommandMap.Create(new HashSet<string>
{ // EXCLUDE a few commands
"INFO", "CONFIG", "CLUSTER",
"PING", "ECHO", "CLIENT"
}, available: false),
KeepAlive = 180,
DefaultVersion = new Version(2, 8, 8),
Password = "changeme"
};

In essence, the ability to initialize our SignalR scale-out environment with a collection of servers now solves the initial problem.