如何编写一个可伸缩的基于 TCP/IP 的服务器

我正在编写一个新的 Windows 服务应用程序,该应用程序接受长时间运行的 TCP/IP 连接(例如,这不像 HTTP 那样有许多短连接,而是一个客户端连接并保持连接数小时、数天甚至数周)。

我正在寻找设计网络架构的最佳方法。我需要为服务启动至少一个线程。我正在考虑使用异步 API (BeginRecieve 等) ,因为我不知道在任何给定的时间(可能有数百个)我将连接多少个客户机。我当然不想为每个连接启动一个线程。

数据将主要从我的服务器流向客户端,但有时会有一些命令从客户端发送。这主要是一个监视应用程序,其中我的服务器定期向客户端发送状态数据。

使其尽可能可伸缩的最佳方法是什么? 基本工作流?

明确地说,我正在寻找基于.NET 的解决方案(如果可能的话,C # ,但任何.NET 语言都可以)。

我需要一个解决方案的工作示例,要么作为我可以下载的东西的指针,要么作为一个简短的示例。一定是的。基于 NET 和 Windows 的(任何。NET 语言)。

71070 次浏览

您可以尝试使用一个名为 自适应通信环境(ACE)的框架,它是用于网络服务器的通用 C + + 框架。这是一个非常坚实,成熟的产品,旨在支持高可靠性,大批量的应用,直至电信级。

该框架处理相当广泛的并发模型,并且可能有一个适合您的应用程序开箱即用的模型。这将使系统更容易调试,因为大多数讨厌的并发问题已经得到解决。这里的权衡是框架是用 C + + 编写的,并不是最温暖和松散的代码库。另一方面,你可以得到测试,工业级的网络基础设施和高度可扩展的架构。

我会使用 SEDA或轻量级线程库(二郎或更新的 Linux)。见 服务器端的 NTPL 可伸缩性)。如果您的通信不是:)异步编码是非常麻烦的

好吧。NET 套接字似乎提供了 Select ()-这是最好的处理输入。对于输出,我有一个套接字编写器线程池来监听工作队列,接受套接字描述符/对象作为工作项的一部分,所以您不需要每个套接字一个线程。

明确地说,我正在寻找基于.NET 的解决方案(如果可能的话,C # ,但任何.NET 语言都可以)

如果纯粹使用。NET.GC 暂停会阻碍延迟。

我需要为服务启动至少一个线程。我正在考虑使用异步 API (BeginReceive 等)因为我不知道在任何给定的时间我会连接多少个客户端(可能有数百个)。我当然不想为每个连接启动一个线程。

重叠 I/O 通常被认为是 Windows 最快的网络通信 API。我不知道这是不是和你的异步 API 一样。不要使用 选择,因为每个调用都需要检查每个打开的套接字,而不是对活动套接字进行回调。

吸毒。NET 的集成异步 I/O (BeginRead等)是一个好主意,如果你可以得到所有的细节。当您正确地设置您的套接字/文件句柄时,它将使用操作系统的底层 IOCP 实现,允许您的操作在不使用任何线程的情况下完成(或者,在最糟糕的情况下,使用一个我认为来自内核的 I/O 线程池的线程,而不是。NET 的线程池,这有助于缓解线程池拥塞。)

主要的问题是要确保在非阻塞模式下打开套接字/文件。大多数默认的便利函数(如 File.OpenRead)不这样做,因此您需要编写自己的函数。

另一个主要问题是错误处理——在编写异步 I/O 代码时正确处理错误比在同步代码中要困难得多。即使您不直接使用线程,也很容易出现竞态条件和死锁,因此您需要注意这一点。

如果可能的话,您应该尝试并使用一个方便的库来简化进行可伸缩的异步 I/O 的过程。

微软的 并发协调运行时就是一个例子。NET 库的设计,以减轻这种编程的困难。它看起来很棒,但由于我还没有使用过它,我不能评论它的规模。

对于需要执行异步网络或磁盘 I/O 的个人项目,我使用一组。NET 并发/I/O 工具,我在过去一年中构建的,称为 平方,任务。它的灵感来自于像 Imvu.Task扭曲这样的库,我在做网络 I/O 的库中包含了一些 例子,我也在我写的一些应用程序中使用过它——最大的公开发布的是 NDexer(用于无线磁盘 I/O)。这个库是根据我使用 imvu.task 的经验编写的,并且有一组相当全面的单元测试,因此我强烈建议您尝试使用它。如果你有任何问题,我很乐意提供一些帮助。

在我看来,根据我使用异步/无线程 I/O 而不是线程的经验,在。NET 平台,只要你准备好应付学习曲线。它允许您避免由于 Thread 对象的成本而带来的可伸缩性麻烦,在许多情况下,您可以通过谨慎使用诸如 未来和承诺之类的并发原语来完全避免使用锁和互斥锁。

考虑只使用 周转基金 net TCP 绑定和发布/订阅模式。WCF 将允许您(大多数情况下)专注于您的领域,而不是管道..。

在 IDesign 的下载部分有很多 WCF 示例,甚至还有一个发布/订阅框架,它可能很有用: http://www.idesign.net

您可以在 C10K 问题页中找到一个很好的技术概述。

我的一些解决方案中运行了这样一个服务器。这里有一个非常详细的解释,不同的方式做到这一点。网址: 使用.NET 中的高性能套接字接近终端

最近,我一直在寻找改进我们代码的方法,并将研究这一点: “ 版本3.5中的 Socket 性能增强”,其中特别包括“供使用异步网络 I/O 以实现最高性能的应用程序使用”。

”这些增强的主要特性是避免了在大容量异步套接字 I/O 期间对象的重复分配和同步。当前由 Socket 类为异步套接字 I/O 实现的 Begin/End 设计模式要求为每个异步套接字操作分配 System.IAsyncResult 对象。”

你可以继续阅读,如果你按照链接。我个人明天将测试他们的示例代码,以便对照我所得到的代码进行基准测试。

在这里 ,您可以使用新的3.5 SocketAsyncEventArgs 找到客户机和服务器的工作代码,因此您可以在几分钟内测试它并查看代码。这是一种简单的方法,但它是启动更大规模实现的基础。同样 这个的文章从近两年前在 MSDN 杂志是一个有趣的阅读。

我以前也写过类似的东西。我多年前的研究表明,使用 异步的套接字编写您自己的套接字实现是最好的选择。这意味着客户端实际上不需要做任何事情,只需要相对较少的资源。任何确实发生的事情都由。NET 线程池。

我将其编写为一个类,用于管理服务器的所有连接。

我只是使用一个列表来保存所有的客户端连接,但是如果您需要更快地查找更大的列表,您可以按照自己的意愿编写它。

private List<xConnection> _sockets;

此外,您还需要套接字实际监听传入的连接。

private System.Net.Sockets.Socket _serverSocket;

Start 方法实际上启动服务器套接字并开始监听任何传入的连接。

public bool Start()
{
System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
System.Net.IPEndPoint serverEndPoint;
try
{
serverEndPoint = new System.Net.IPEndPoint(localhost.AddressList[0], _port);
}
catch (System.ArgumentOutOfRangeException e)
{
throw new ArgumentOutOfRangeException("Port number entered would seem to be invalid, should be between 1024 and 65000", e);
}
try
{
_serverSocket = new System.Net.Sockets.Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
}
catch (System.Net.Sockets.SocketException e)
{
throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e);
}
try
{
_serverSocket.Bind(serverEndPoint);
_serverSocket.Listen(_backlog);
}
catch (Exception e)
{
throw new ApplicationException("An error occurred while binding socket. Check inner exception", e);
}
try
{
//warning, only call this once, this is a bug in .net 2.0 that breaks if
// you're running multiple asynch accepts, this bug may be fixed, but
// it was a major pain in the rear previously, so make sure there is only one
//BeginAccept running
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
catch (Exception e)
{
throw new ApplicationException("An error occurred starting listeners. Check inner exception", e);
}
return true;
}

我只想指出异常处理代码看起来很糟糕,但原因是我有异常抑制代码在那里,所以任何异常将被抑制,并返回 false,如果配置选项被设置,但我想删除它为了简洁起见。

_ serverSocket.上面的 BeginAccept (new AsyncCallback (eptCallback)) ,_ serverSocket)实际上设置了我们的服务器套接字,以便在用户连接时调用 eptCallback 方法。此方法从。NET 线程池,如果有许多阻塞操作,它会自动处理创建额外的工作线程。这将最佳地处理服务器上的任何负载。

    private void acceptCallback(IAsyncResult result)
{
xConnection conn = new xConnection();
try
{
//Finish accepting the connection
System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
conn = new xConnection();
conn.socket = s.EndAccept(result);
conn.buffer = new byte[_bufferSize];
lock (_sockets)
{
_sockets.Add(conn);
}
//Queue receiving of data from the connection
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
//Queue the accept of the next incoming connection
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
catch (SocketException e)
{
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
//Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
catch (Exception e)
{
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
//Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
}

上面的代码实际上刚刚完成了接受进来的连接,对 BeginReceive进行排队,这是一个回调,当客户端发送数据时将运行,然后对下一个 acceptCallback进行排队,后者将接受进来的下一个客户端连接。

BeginReceive方法调用告诉套接字从客户端接收数据时应该做什么。对于 BeginReceive,您需要给它一个字节数组,这是它在客户端发送数据时复制数据的位置。ReceiveCallback方法将被调用,这是我们处理接收数据的方式。

private void ReceiveCallback(IAsyncResult result)
{
//get our connection from the callback
xConnection conn = (xConnection)result.AsyncState;
//catch any errors, we'd better not have any
try
{
//Grab our buffer and count the number of bytes receives
int bytesRead = conn.socket.EndReceive(result);
//make sure we've read something, if we haven't it supposadly means that the client disconnected
if (bytesRead > 0)
{
//put whatever you want to do when you receive data here


//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
}
else
{
//Callback run but no data, close the connection
//supposadly means a disconnect
//and we still have to close the socket, even though we throw the event later
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
}
catch (SocketException e)
{
//Something went terribly wrong
//which shouldn't have happened
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
}
}

编辑: 在这个模式中,我忘了提到在这个代码区域:

//put whatever you want to do when you receive data here


//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);

通常,在任何您想要的代码中,我都会将数据包重新组装成消息,然后在线程池中创建它们作为作业。这样,在运行任何消息处理代码时,来自客户端的下一个块的 BeginRecept 都不会延迟。

接受回调通过调用结束接收完成对数据套接字的读取。这将填充 start 接收函数中提供的缓冲区。一旦你在我留下注释的地方做了你想做的事情,我们调用下一个 BeginReceive方法,如果客户端发送更多的数据,它将再次运行回调。

现在真正棘手的部分来了: 当客户端发送数据时,您的接收回调可能只会被部分消息调用。重组会变得非常非常复杂。我用我自己的方法创建了一种专有协议来实现这一点。我省略了,但如果你要求,我可以加进去。这个处理程序实际上是我写过的最复杂的代码。

public bool Send(byte[] message, xConnection conn)
{
if (conn != null && conn.socket.Connected)
{
lock (conn.socket)
{
//we use a blocking mode send, no async on the outgoing
//since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode
conn.socket.Send(bytes, bytes.Length, SocketFlags.None);
}
}
else
return false;
return true;
}

上面的 send 方法实际上使用同步 Send调用。对于我来说,由于消息大小和应用程序的多线程特性,这样做没有问题。如果要发送到每个客户端,只需循环通过 _ sockets 列表即可。

上面引用的 xConnection 类基本上是一个简单的包装器,用于包含字节缓冲区的套接字,在我的实现中还有一些额外的东西。

public class xConnection : xBase
{
public byte[] buffer;
public System.Net.Sockets.Socket socket;
}

这里还有一些包含在内的 using作为参考,因为当它们没有包含在内的时候我总是感到很恼火。

using System.Net.Sockets;

希望这能有帮助。它可能不是最干净的代码,但它工作。还有一些代码的细微差别,您应该对更改感到厌倦。首先,在任何时候只调用一个 BeginAccept。以前有个很烦人的。这是几年前的事了,所以我不记得细节了。

此外,在 ReceiveCallback代码中,我们在排队下一个接收之前处理从套接字接收的任何内容。这意味着对于单个套接字,我们实际上在任何时间点只在 ReceiveCallback中出现过一次,而且我们不需要使用线程同步。但是,如果在提取数据后立即重新排序这个函数以调用下一个接收函数(这可能会快一些) ,则需要确保正确地同步线程。

还有,我破解了很多代码,但留下了正在发生的事情的本质。这对你的设计来说应该是个好的开始。如果你还有其他问题,请留言。

我建议读 ACE上的这些书,

获得模式的想法,使您能够创建一个高效的服务器。

虽然 ACE 是用 C + + 实现的,但是这些书涵盖了很多可以在任何编程语言中使用的有用模式。

我想知道一件事:

我绝对不想开始一个 每个连接的螺纹。

为什么?至少从 Windows2000开始,Windows 可以处理应用程序中的数百个线程。我已经完成了,如果线程不需要同步的话,使用它真的很容易。特别是考虑到您要执行大量 I/O 操作(因此您不受 CPU 限制,并且在磁盘或网络通信上会阻塞大量线程) ,我不理解这种限制。

您是否测试过多线程方法并发现它缺少某些东西?您是否还打算为每个线程建立一个数据库连接(这会杀死数据库服务器,因此这是一个坏主意,但通过3层设计可以很容易地解决这个问题)。你担心你会有成千上万的客户,而不是成百上千的,然后你就真的有问题了吗?(如果我有32 + GB 的内存,我会尝试使用1000个线程,甚至10000个线程——再说一次,考虑到你不受 CPU 限制,线程切换时间应该是完全不相关的。)

这里是代码-看看这是如何运行,转到 http://mdpopescu.blogspot.com/2009/05/multi-threaded-server.html并点击图片。

服务器类:

  public class Server
{
private static readonly TcpListener listener = new TcpListener(IPAddress.Any, 9999);


public Server()
{
listener.Start();
Console.WriteLine("Started.");


while (true)
{
Console.WriteLine("Waiting for connection...");


var client = listener.AcceptTcpClient();
Console.WriteLine("Connected!");


// each connection has its own thread
new Thread(ServeData).Start(client);
}
}


private static void ServeData(object clientSocket)
{
Console.WriteLine("Started thread " + Thread.CurrentThread.ManagedThreadId);


var rnd = new Random();
try
{
var client = (TcpClient) clientSocket;
var stream = client.GetStream();
while (true)
{
if (rnd.NextDouble() < 0.1)
{
var msg = Encoding.ASCII.GetBytes("Status update from thread " + Thread.CurrentThread.ManagedThreadId);
stream.Write(msg, 0, msg.Length);


Console.WriteLine("Status update from thread " + Thread.CurrentThread.ManagedThreadId);
}


// wait until the next update - I made the wait time so small 'cause I was bored :)
Thread.Sleep(new TimeSpan(0, 0, rnd.Next(1, 5)));
}
}
catch (SocketException e)
{
Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e);
}
}
}

服务器主程序:

namespace ManyThreadsServer
{
internal class Program
{
private static void Main(string[] args)
{
new Server();
}
}
}

客户类别:

  public class Client
{
public Client()
{
var client = new TcpClient();
client.Connect(IPAddress.Loopback, 9999);


var msg = new byte[1024];


var stream = client.GetStream();
try
{
while (true)
{
int i;
while ((i = stream.Read(msg, 0, msg.Length)) != 0)
{
var data = Encoding.ASCII.GetString(msg, 0, i);
Console.WriteLine("Received: {0}", data);
}
}
}
catch (SocketException e)
{
Console.WriteLine("Socket exception in thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, e);
}
}
}

客户端主程序:

using System;
using System.Threading;


namespace ManyThreadsClient
{
internal class Program
{
private static void Main(string[] args)
{
// first argument is the number of threads
for (var i = 0; i < Int32.Parse(args[0]); i++)
new Thread(RunClient).Start();
}


private static void RunClient()
{
new Client();
}
}
}

在 C # 中进行网络操作的方法有很多种。它们都使用不同的机制,因此在高并发性方面存在严重的性能问题。Begin * 操作是其中之一,许多人经常错误地认为它是最快/最快的联网方式。

为了解决这些问题,他们引入了 异步方法集: From MSDN,SocketAsyncEventArgs 类-

SocketAsyncEventArgs 类是 System 增强集的一部分。网。插座。.::.Socket 类,它提供了一种可供专门的高性能套接字应用程序使用的可选异步模式。此类专门为需要高性能的网络服务器应用程序设计。应用程序可以专门使用增强的异步模式,也可以只在目标热点区域使用(例如,在接收大量数据时)。

这些增强的主要特性是避免了在大容量异步套接字 I/O 期间对象的重复分配和同步。系统当前实现的 Begin/End 设计模式。网。插座。.::.套接字类需要一个系统。.::.为每个异步套接字操作分配 IAsyncResult 对象。

在表面之下,* Async API 使用 I/O 完成端口,这是执行网络操作的最快方法,参见 Windows Sockets 2.0: 使用完成端口编写可伸缩的 Winsock 应用程序

为了帮助您解决这个问题,我将包含使用 * Async API 编写的 telnet 服务器的源代码。我只包括相关的部分。还要注意的是,我没有内联处理数据,而是选择将其推送到在单独线程上处理的无锁(无等待)队列上。请注意,我没有包括相应的 Pool 类,它只是一个简单的池,如果它是空的,它将创建一个新对象; 而 Buffer 类只是一个自我扩展的缓冲区,除非你接收到不确定数量的数据,否则并不真正需要它。

public class Telnet
{
private readonly Pool<SocketAsyncEventArgs> m_EventArgsPool;
private Socket m_ListenSocket;


/// <summary>
/// This event fires when a connection has been established.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> Connected;


/// <summary>
/// This event fires when a connection has been shutdown.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> Disconnected;


/// <summary>
/// This event fires when data is received on the socket.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> DataReceived;


/// <summary>
/// This event fires when data is finished sending on the socket.
/// </summary>
public event EventHandler<SocketAsyncEventArgs> DataSent;


/// <summary>
/// This event fires when a line has been received.
/// </summary>
public event EventHandler<LineReceivedEventArgs> LineReceived;


/// <summary>
/// Specifies the port to listen on.
/// </summary>
[DefaultValue(23)]
public int ListenPort { get; set; }


/// <summary>
/// Constructor for Telnet class.
/// </summary>
public Telnet()
{
m_EventArgsPool = new Pool<SocketAsyncEventArgs>();
ListenPort = 23;
}


/// <summary>
/// Starts the telnet server listening and accepting data.
/// </summary>
public void Start()
{
IPEndPoint endpoint = new IPEndPoint(0, ListenPort);
m_ListenSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);


m_ListenSocket.Bind(endpoint);
m_ListenSocket.Listen(100);


//
// Post Accept
//
StartAccept(null);
}


/// <summary>
/// Not Yet Implemented. Should shutdown all connections gracefully.
/// </summary>
public void Stop()
{
//throw (new NotImplementedException());
}


//
// ACCEPT
//


/// <summary>
/// Posts a requests for Accepting a connection. If it is being called from the completion of
/// an AcceptAsync call, then the AcceptSocket is cleared since it will create a new one for
/// the new user.
/// </summary>
/// <param name="e">null if posted from startup, otherwise a <b>SocketAsyncEventArgs</b> for reuse.</param>
private void StartAccept(SocketAsyncEventArgs e)
{
if (e == null)
{
e = m_EventArgsPool.Pop();
e.Completed += Accept_Completed;
}
else
{
e.AcceptSocket = null;
}


if (m_ListenSocket.AcceptAsync(e) == false)
{
Accept_Completed(this, e);
}
}


/// <summary>
/// Completion callback routine for the AcceptAsync post. This will verify that the Accept occured
/// and then setup a Receive chain to begin receiving data.
/// </summary>
/// <param name="sender">object which posted the AcceptAsync</param>
/// <param name="e">Information about the Accept call.</param>
private void Accept_Completed(object sender, SocketAsyncEventArgs e)
{
//
// Socket Options
//
e.AcceptSocket.NoDelay = true;


//
// Create and setup a new connection object for this user
//
Connection connection = new Connection(this, e.AcceptSocket);


//
// Tell the client that we will be echo'ing data sent
//
DisableEcho(connection);


//
// Post the first receive
//
SocketAsyncEventArgs args = m_EventArgsPool.Pop();
args.UserToken = connection;


//
// Connect Event
//
if (Connected != null)
{
Connected(this, args);
}


args.Completed += Receive_Completed;
PostReceive(args);


//
// Post another accept
//
StartAccept(e);
}


//
// RECEIVE
//


/// <summary>
/// Post an asynchronous receive on the socket.
/// </summary>
/// <param name="e">Used to store information about the Receive call.</param>
private void PostReceive(SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;


if (connection != null)
{
connection.ReceiveBuffer.EnsureCapacity(64);
e.SetBuffer(connection.ReceiveBuffer.DataBuffer, connection.ReceiveBuffer.Count, connection.ReceiveBuffer.Remaining);


if (connection.Socket.ReceiveAsync(e) == false)
{
Receive_Completed(this, e);
}
}
}


/// <summary>
/// Receive completion callback. Should verify the connection, and then notify any event listeners
/// that data has been received. For now it is always expected that the data will be handled by the
/// listeners and thus the buffer is cleared after every call.
/// </summary>
/// <param name="sender">object which posted the ReceiveAsync</param>
/// <param name="e">Information about the Receive call.</param>
private void Receive_Completed(object sender, SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;


if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || connection == null)
{
Disconnect(e);
return;
}


connection.ReceiveBuffer.UpdateCount(e.BytesTransferred);


OnDataReceived(e);


HandleCommand(e);
Echo(e);


OnLineReceived(connection);


PostReceive(e);
}


/// <summary>
/// Handles Event of Data being Received.
/// </summary>
/// <param name="e">Information about the received data.</param>
protected void OnDataReceived(SocketAsyncEventArgs e)
{
if (DataReceived != null)
{
DataReceived(this, e);
}
}


/// <summary>
/// Handles Event of a Line being Received.
/// </summary>
/// <param name="connection">User connection.</param>
protected void OnLineReceived(Connection connection)
{
if (LineReceived != null)
{
int index = 0;
int start = 0;


while ((index = connection.ReceiveBuffer.IndexOf('\n', index)) != -1)
{
string s = connection.ReceiveBuffer.GetString(start, index - start - 1);
s = s.Backspace();


LineReceivedEventArgs args = new LineReceivedEventArgs(connection, s);
Delegate[] delegates = LineReceived.GetInvocationList();


foreach (Delegate d in delegates)
{
d.DynamicInvoke(new object[] { this, args });


if (args.Handled == true)
{
break;
}
}


if (args.Handled == false)
{
connection.CommandBuffer.Enqueue(s);
}


start = index;
index++;
}


if (start > 0)
{
connection.ReceiveBuffer.Reset(0, start + 1);
}
}
}


//
// SEND
//


/// <summary>
/// Overloaded. Sends a string over the telnet socket.
/// </summary>
/// <param name="connection">Connection to send data on.</param>
/// <param name="s">Data to send.</param>
/// <returns>true if the data was sent successfully.</returns>
public bool Send(Connection connection, string s)
{
if (String.IsNullOrEmpty(s) == false)
{
return Send(connection, Encoding.Default.GetBytes(s));
}


return false;
}


/// <summary>
/// Overloaded. Sends an array of data to the client.
/// </summary>
/// <param name="connection">Connection to send data on.</param>
/// <param name="data">Data to send.</param>
/// <returns>true if the data was sent successfully.</returns>
public bool Send(Connection connection, byte[] data)
{
return Send(connection, data, 0, data.Length);
}


public bool Send(Connection connection, char c)
{
return Send(connection, new byte[] { (byte)c }, 0, 1);
}


/// <summary>
/// Sends an array of data to the client.
/// </summary>
/// <param name="connection">Connection to send data on.</param>
/// <param name="data">Data to send.</param>
/// <param name="offset">Starting offset of date in the buffer.</param>
/// <param name="length">Amount of data in bytes to send.</param>
/// <returns></returns>
public bool Send(Connection connection, byte[] data, int offset, int length)
{
bool status = true;


if (connection.Socket == null || connection.Socket.Connected == false)
{
return false;
}


SocketAsyncEventArgs args = m_EventArgsPool.Pop();
args.UserToken = connection;
args.Completed += Send_Completed;
args.SetBuffer(data, offset, length);


try
{
if (connection.Socket.SendAsync(args) == false)
{
Send_Completed(this, args);
}
}
catch (ObjectDisposedException)
{
//
// return the SocketAsyncEventArgs back to the pool and return as the
// socket has been shutdown and disposed of
//
m_EventArgsPool.Push(args);
status = false;
}


return status;
}


/// <summary>
/// Sends a command telling the client that the server WILL echo data.
/// </summary>
/// <param name="connection">Connection to disable echo on.</param>
public void DisableEcho(Connection connection)
{
byte[] b = new byte[] { 255, 251, 1 };
Send(connection, b);
}


/// <summary>
/// Completion callback for SendAsync.
/// </summary>
/// <param name="sender">object which initiated the SendAsync</param>
/// <param name="e">Information about the SendAsync call.</param>
private void Send_Completed(object sender, SocketAsyncEventArgs e)
{
e.Completed -= Send_Completed;
m_EventArgsPool.Push(e);
}


/// <summary>
/// Handles a Telnet command.
/// </summary>
/// <param name="e">Information about the data received.</param>
private void HandleCommand(SocketAsyncEventArgs e)
{
Connection c = e.UserToken as Connection;


if (c == null || e.BytesTransferred < 3)
{
return;
}


for (int i = 0; i < e.BytesTransferred; i += 3)
{
if (e.BytesTransferred - i < 3)
{
break;
}


if (e.Buffer[i] == (int)TelnetCommand.IAC)
{
TelnetCommand command = (TelnetCommand)e.Buffer[i + 1];
TelnetOption option = (TelnetOption)e.Buffer[i + 2];


switch (command)
{
case TelnetCommand.DO:
if (option == TelnetOption.Echo)
{
// ECHO
}
break;
case TelnetCommand.WILL:
if (option == TelnetOption.Echo)
{
// ECHO
}
break;
}


c.ReceiveBuffer.Remove(i, 3);
}
}
}


/// <summary>
/// Echoes data back to the client.
/// </summary>
/// <param name="e">Information about the received data to be echoed.</param>
private void Echo(SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;


if (connection == null)
{
return;
}


//
// backspacing would cause the cursor to proceed beyond the beginning of the input line
// so prevent this
//
string bs = connection.ReceiveBuffer.ToString();


if (bs.CountAfterBackspace() < 0)
{
return;
}


//
// find the starting offset (first non-backspace character)
//
int i = 0;


for (i = 0; i < connection.ReceiveBuffer.Count; i++)
{
if (connection.ReceiveBuffer[i] != '\b')
{
break;
}
}


string s = Encoding.Default.GetString(e.Buffer, Math.Max(e.Offset, i), e.BytesTransferred);


if (connection.Secure)
{
s = s.ReplaceNot("\r\n\b".ToCharArray(), '*');
}


s = s.Replace("\b", "\b \b");


Send(connection, s);
}


//
// DISCONNECT
//


/// <summary>
/// Disconnects a socket.
/// </summary>
/// <remarks>
/// It is expected that this disconnect is always posted by a failed receive call. Calling the public
/// version of this method will cause the next posted receive to fail and this will cleanup properly.
/// It is not advised to call this method directly.
/// </remarks>
/// <param name="e">Information about the socket to be disconnected.</param>
private void Disconnect(SocketAsyncEventArgs e)
{
Connection connection = e.UserToken as Connection;


if (connection == null)
{
throw (new ArgumentNullException("e.UserToken"));
}


try
{
connection.Socket.Shutdown(SocketShutdown.Both);
}
catch
{
}


connection.Socket.Close();


if (Disconnected != null)
{
Disconnected(this, e);
}


e.Completed -= Receive_Completed;
m_EventArgsPool.Push(e);
}


/// <summary>
/// Marks a specific connection for graceful shutdown. The next receive or send to be posted
/// will fail and close the connection.
/// </summary>
/// <param name="connection"></param>
public void Disconnect(Connection connection)
{
try
{
connection.Socket.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
}
}


/// <summary>
/// Telnet command codes.
/// </summary>
internal enum TelnetCommand
{
SE = 240,
NOP = 241,
DM = 242,
BRK = 243,
IP = 244,
AO = 245,
AYT = 246,
EC = 247,
EL = 248,
GA = 249,
SB = 250,
WILL = 251,
WONT = 252,
DO = 253,
DONT = 254,
IAC = 255
}


/// <summary>
/// Telnet command options.
/// </summary>
internal enum TelnetOption
{
Echo = 1,
SuppressGoAhead = 3,
Status = 5,
TimingMark = 6,
TerminalType = 24,
WindowSize = 31,
TerminalSpeed = 32,
RemoteFlowControl = 33,
LineMode = 34,
EnvironmentVariables = 36
}
}

中添加的 AcceptAsync/ConnectAsync/ReceiveAsync/SendAsync 方法。NET 3.5.我已经做了一个基准,他们大约35% 的速度(响应时间和比特率)与100个用户不断发送和接收数据。

您已经通过上面的代码示例获得了大部分答案。使用异步 I/O 操作绝对是解决这个问题的方法。异步 I/O 是 Win32内部设计的扩展方式。您可以获得的最佳性能是使用 完成端口实现的,将您的套接字绑定到完成端口,并且有一个线程池等待完成端口的完成。常见的做法是让每个 CPU (核心)有2-4个线程等待完成。我强烈推荐阅读 Windows 性能团队的 Rick Vicik 的这三篇文章:

  1. 性能设计应用-第1部分
  2. 性能设计应用-第2部分
  3. 性能设计应用-第3部分

上述文章主要涉及本地 Windows API,但对于任何试图掌握可伸缩性和性能的人来说,这些文章是必读的。他们也有一些管理方面的简报。

第二件你需要做的事情是确保你过了 提高.NET 应用程序的性能和可伸缩性的书,这是可用的在线。您将在第5章中找到与线程、异步调用和锁的使用有关的有效建议。但是真正的精华在第17章,您将在那里找到关于优化线程池的实用指南。我的应用程序出现了一些严重的问题,直到我按照本章的建议调整了 maxIothread/maxWorkerThreads。

你说你想做一个纯 TCP 服务器,所以我的下一点是假的。如果你发现自己走投无路,使用 WebRequest 类及其派生类,请注意,有一条龙守卫着那扇门: ServicePointManager。这是一个配置类,它在生活中只有一个目的: 破坏您的性能。确保从人为强制的 ServicePoint 中释放服务器。ConnectionLimit 或者您的应用程序将永远无法伸缩(我让您自己发现默认值是什么...)。您还可以重新考虑在 HTTP 请求中发送 Expect100ContineHeader 的默认策略。

现在介绍一下核心套接字管理 API,发送端的工作相当简单,但接收端的工作要复杂得多。为了实现高吞吐量和高规模,必须确保套接字不受流控制,因为您没有为接收提交缓冲区。对于高性能来说,你应该提前发布3-4个缓冲区,一旦你得到了一个新的缓冲区(之前处理你得到的缓冲区) ,这样你就可以确保套接字总是有地方存放来自网络的数据。您将看到为什么您可能不能很快实现这一点。

当您使用 BeginRead/BeginWrite API 并开始认真工作之后,您将意识到需要对流量进行安全保护,即 NTLM/Kerberos 身份验证和流量加密,或者至少是流量篡改保护。这样做的方法是使用内置的 System。网。保安。协商流(如果需要跨不同的域,可以使用 SslStream)。这意味着您将不依赖于直接套接字异步操作,而是依赖于 AuthenticatedStream 异步操作。一旦您获得了一个套接字(从客户端的连接或从服务器的接受) ,您就在套接字上创建了一个流,并通过调用 BeginAuthenticateAsClient 或 BeginAuthenticateAsServer 提交它进行身份验证。在认证完成后(至少你的安全从原生 InitiateSecurityContext/AcceptSecurityContext 疯狂...) ,你将通过检查 RemoteIdentity 属性你的验证流和做任何 前十字韧带验证你的产品必须支持你的授权。

之后,您将使用 BeginWrite 发送消息,并使用 BeginRead 接收消息。这就是我之前提到的问题,您不能发布多个接收缓冲区,因为 AuthenticateStream 类不支持这一点。BeginRead 操作在内部管理所有 I/O,直到您收到整个帧。否则,它无法处理消息身份验证(对帧进行解密并在帧上验证签名)。尽管根据我的经验,AuthenticatedStream 类完成的工作相当不错,应该没有任何问题。也就是说,你应该能够饱和1 Gbit/s 的网络只有4-5% 的 CPU。AuthenticatedStream 类还会对您施加特定于协议的帧大小限制(SSL 为16k,Kerberos 为12k)。

这应该能让你走上正轨。我不打算在这里发布代码,这里有一个 MSDN 上完美的例子。我已经做了许多这样的项目,我能够规模约1000个用户连接没有问题。除此之外,还需要修改注册表项,以允许内核使用更多的套接字句柄。并确保您部署在 服务器操作系统上,即 WindowsServer2003,而不是 Windows XP 或 Windows Vista(即客户端操作系统) ,这会产生很大的差异。

顺便说一句,如果您在服务器或文件 I/O 上有数据库操作,那么请确保您也使用异步风格,否则您将立即耗尽线程池。对于 SQLServer 连接,请确保将“异步处理 = 真”添加到连接字符串中。

曾经有一个关于可伸缩 TCP/IP 使用的很好的讨论。作者: Chris Mullins of Coversant。不幸的是,他的博客似乎已经从原来的位置消失了,所以我将尝试从记忆中拼凑出他的建议(他的一些有用的评论出现在这个帖子: C + + vs C # : 开发一个高度可伸缩的 IOCP 服务器)

首先,请注意,在 Socket类上使用 Begin/EndAsync方法都使用 I/O 完成端口(IOCP)来提供可伸缩性。这对可伸缩性的影响要大得多(如果使用正确,请参见下文) ,而不是实际选择哪种方法来实现解决方案。

Chris Mullins 的文章是基于使用 Begin/End,这是我个人有经验的一个。请注意,Chris 在此基础上提出了一个解决方案,在32位机器上的并发客户端连接扩展到10,000秒,而在64位平台上的并发客户端连接扩展到100,000秒,并且有足够的内存。从我自己的经验与这种技术(虽然没有接近这种负荷) ,我没有理由怀疑这些指示性的数字。

IOCP 与每个连接线程或“选择”原语

之所以要使用一种在底层使用 IOCP 的机制,是因为它使用了一个非常低级的 Windows 线程池,在 I/O 通道上有实际数据之前不会唤醒任何线程(注意,IOCP 也可以用于文件 I/O)。这样做的好处是,Windows 不必切换到一个线程,只是发现还没有数据,所以这将减少上下文切换的数量,您的服务器将不得不作出的最低要求。

上下文切换肯定会扼杀“每个连接一个线程”的机制,尽管如果只处理几十个连接,这是一个可行的解决方案。然而,这种机制并不是想象中的“可扩展”的。

使用 IOCP 时的重要注意事项

记忆

首先也是最重要的是要了解 IOCP 很容易在以下情况下导致内存问题。NET 如果你的实现太天真。每个 IOCPBeginReceive调用都将导致“钉住”您正在读入的缓冲区。有关这个问题的详细解释,请参阅: 云锦的博客: OutOfMemory 异常与钉住

幸运的是,这个问题是可以避免的,但它需要一点权衡。建议的解决方案是在应用程序启动(或接近启动)时分配至少90KB 左右的大型 byte[]缓冲区(截至。NET 2,所需的大小可能会在以后的版本中更大)。这样做的原因是,大型内存分配自动结束于非压缩内存段(大型对象堆大型对象堆) ,该段被有效地自动固定。通过在启动时分配一个大的缓冲区,你可以确保这块不可移动的内存位于一个相对较低的地址,它不会阻碍并导致碎片化。

然后,您可以使用偏移量将这个大型缓冲区分割为需要读取某些数据的每个连接的单独区域。这就是需要权衡的地方; 因为这个缓冲区需要预先分配,所以你必须决定每个连接需要多少缓冲区空间,以及你想要扩展到的连接数上限是多少(或者,你可以实现一个抽象,在你需要的时候可以分配额外的固定缓冲区)。

最简单的解决方案是在此缓冲区内以唯一偏移量为每个连接分配一个字节。然后,您可以调用 BeginReceive来读取一个字节,并根据所获得的回调执行剩余的读取操作。

正在处理

当你从 Begin调用中得到回调时,意识到回调中的代码将在低级别 IOCP 线程上执行是非常重要的。在这个回调中,完全可以避免冗长的操作,这就是 至关重要。使用这些线程进行复杂的处理将扼杀您的可伸缩性,就像使用“每个连接一个线程”一样有效。

建议的解决方案是仅使用回调来对工作项进行排队,以处理传入的数据,这些数据将在其他线程上执行。避免回调中任何潜在的阻塞操作,以便 IOCP 线程能够尽快返回到它的池。进去。NET 4.0我建议最简单的解决方案是生成一个 Task,给它一个对客户端套接字的引用,以及 BeginReceive调用已经读取的第一个字节的副本。然后,这个任务负责从表示您正在处理的请求的套接字中读取所有数据,执行它,然后再次发出一个新的 BeginReceive调用,为 IOCP 对套接字进行排队。Pre.NET 4.0,您可以使用 ThreadPool,或者创建自己的线程化工作队列实现。

摘要

基本上,我建议对此解决方案使用 Kevin 的样本代码,并添加以下警告:

  • 确保传递给 BeginReceive的缓冲区已经“固定”了
  • 确保您传递给 BeginReceive的回调只是将任务排队以处理传入数据的实际处理

当您这样做的时候,我毫不怀疑您可以复制 Chris 的结果,以扩展到潜在的成千上万的同时客户端(当然,只要有合适的硬件和您自己的处理代码的高效实现;)

您可以使用 推送框架开源框架进行高性能服务器开发。它构建于 IOCP之上,适用于推送场景和消息广播。

我用的是 凯文的解决方案,但他说这个解决方案缺乏重新组装消息的代码。开发人员可以使用此代码重新组装消息:

private static void ReceiveCallback(IAsyncResult asyncResult )
{
ClientInfo cInfo = (ClientInfo)asyncResult.AsyncState;


cInfo.BytesReceived += cInfo.Soket.EndReceive(asyncResult);
if (cInfo.RcvBuffer == null)
{
// First 2 byte is lenght
if (cInfo.BytesReceived >= 2)
{
//this calculation depends on format which your client use for lenght info
byte[] len = new byte[ 2 ] ;
len[0] = cInfo.LengthBuffer[1];
len[1] = cInfo.LengthBuffer[0];
UInt16 length = BitConverter.ToUInt16( len , 0);


// buffering and nulling is very important
cInfo.RcvBuffer = new byte[length];
cInfo.BytesReceived = 0;


}
}
else
{
if (cInfo.BytesReceived == cInfo.RcvBuffer.Length)
{
//Put your code here, use bytes comes from  "cInfo.RcvBuffer"


//Send Response but don't use async send , otherwise your code will not work ( RcvBuffer will be null prematurely and it will ruin your code)


int sendLenghts = cInfo.Soket.Send( sendBack, sendBack.Length, SocketFlags.None);


// buffering and nulling is very important
//Important , set RcvBuffer to null because code will decide to get data or 2 bte lenght according to RcvBuffer's value(null or initialized)
cInfo.RcvBuffer = null;
cInfo.BytesReceived = 0;
}
}


ContinueReading(cInfo);
}


private static void ContinueReading(ClientInfo cInfo)
{
try
{
if (cInfo.RcvBuffer != null)
{
cInfo.Soket.BeginReceive(cInfo.RcvBuffer, cInfo.BytesReceived, cInfo.RcvBuffer.Length - cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo);
}
else
{
cInfo.Soket.BeginReceive(cInfo.LengthBuffer, cInfo.BytesReceived, cInfo.LengthBuffer.Length - cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo);
}
}
catch (SocketException se)
{
//Handle exception and  Close socket here, use your own code
return;
}
catch (Exception ex)
{
//Handle exception and  Close socket here, use your own code
return;
}
}


class ClientInfo
{
private const int BUFSIZE = 1024 ; // Max size of buffer , depends on solution
private const int BUFLENSIZE = 2; // lenght of lenght , depends on solution
public int BytesReceived = 0 ;
public byte[] RcvBuffer { get; set; }
public byte[] LengthBuffer { get; set; }


public Socket Soket { get; set; }


public ClientInfo(Socket clntSock)
{
Soket = clntSock;
RcvBuffer = null;
LengthBuffer = new byte[ BUFLENSIZE ];
}


}


public static void AcceptCallback(IAsyncResult asyncResult)
{


Socket servSock = (Socket)asyncResult.AsyncState;
Socket clntSock = null;


try
{


clntSock = servSock.EndAccept(asyncResult);


ClientInfo cInfo = new ClientInfo(clntSock);


Receive( cInfo );


}
catch (SocketException se)
{
clntSock.Close();
}
}
private static void Receive(ClientInfo cInfo )
{
try
{
if (cInfo.RcvBuffer == null)
{
cInfo.Soket.BeginReceive(cInfo.LengthBuffer, 0, 2, SocketFlags.None, ReceiveCallback, cInfo);


}
else
{
cInfo.Soket.BeginReceive(cInfo.RcvBuffer, 0, cInfo.BytesReceived, SocketFlags.None, ReceiveCallback, cInfo);


}


}
catch (SocketException se)
{
return;
}
catch (Exception ex)
{
return;
}


}

若要让用户复制粘贴接受的答案,可以重写接受回调方法,删除 _ serverSocket. BeginAccept (新的异步回调(接受回调) ,_ serverSocket) ; 把它放在 finally {}子句中,这样:

private void acceptCallback(IAsyncResult result)
{
xConnection conn = new xConnection();
try
{
//Finish accepting the connection
System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
conn = new xConnection();
conn.socket = s.EndAccept(result);
conn.buffer = new byte[_bufferSize];
lock (_sockets)
{
_sockets.Add(conn);
}
//Queue recieving of data from the connection
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
}
catch (SocketException e)
{
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
}
catch (Exception e)
{
if (conn.socket != null)
{
conn.socket.Close();
lock (_sockets)
{
_sockets.Remove(conn);
}
}
}
finally
{
//Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
_serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
}
}

您甚至可以删除第一个 catch,因为它的内容是相同的,但它是一个模板方法,您应该使用类型化异常来更好地处理异常,并了解是什么导致了错误,所以只需用一些有用的代码来实现这些 catch。