线程池服务器的异步/等待等价物是什么? [英] What is the async/await equivalent of a ThreadPool server?

查看:25
本文介绍了线程池服务器的异步/等待等价物是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用同步 apis 和线程池在一个看起来像这样的 tcp 服务器上工作:

I am working on a tcp server that looks something like this using synchronous apis and the thread pool:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

假设我的目标是以最低的资源使用率处理尽可能多的并发连接,这似乎很快就会受到可用线程数的限制.我怀疑通过使用非阻塞任务 api,我将能够用更少的资源处理更多的事情.

Assuming my goal is to handle as many concurrent connections as possible with the lowest resource usage, this seems that it will be quickly limited by the number of available threads. I suspect that by using Non-blocking Task apis, I will be able to handle much more with fewer resources.

我的初步印象是这样的:

My initial impression is something like:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

但我觉得这可能会导致瓶颈.也许 HandleConnectionAsync 会花费异常长的时间来达到第一个等待,并且会阻止主接受循环继续进行.这将永远只使用一个线程,还是运行时会神奇地在多个线程上运行它认为合适的东西?

But it strikes me that this could cause bottlenecks. Perhaps HandleConnectionAsync will take an unusually long time to hit the first await, and will stop the main accept loop from proceeding. Will this only use one thread ever, or will the runtime magically run things on multiple threads as it sees fit?

有没有办法将这两种方法结合起来,以便我的服务器将准确地使用它需要的线程数来处理活动运行的任务数,但不会在 IO 操作上不必要地阻塞线程?

Is there a way to combine these two approaches so that my server will use exactly the number of threads it needs for the number of actively running tasks, but so that it will not block threads unnecessarily on IO operations?

在这种情况下,有没有一种惯用的方法来最大化吞吐量?

Is there an idiomatic way to maximize throughput in a situation like this?

推荐答案

我会让框架管理线程并且不会创建任何额外的线程,除非分析测试表明我可能需要这样做.特别是,如果 HandleConnectionAsync 内部的调用大多是 IO 绑定的.

I'd let the Framework manage the threading and wouldn't create any extra threads, unless profiling tests suggest I might need to. Especially, if the calls inside HandleConnectionAsync are mostly IO-bound.

无论如何,如果你想在HandleConnectionAsync的开头释放调用线程(调度器),有一个非常简单的解决方案.您可以使用 await Yield()ThreadPool 跳转到一个新线程. 如果您的服务器运行在没有安装在初始线程(控制台应用程序、WCF 服务)上的任何同步上下文,通常是 TCP 服务器的情况.

Anyway, if you like to release the calling thread (the dispatcher) at the beginning of HandleConnectionAsync, there's a very easy solution. You can jump on a new thread from ThreadPool with await Yield(). That works if you server runs in the execution environment which does not have any synchronization context installed on the initial thread (a console app, a WCF service), which is normally the case for a TCP server.

以下说明了这一点(代码最初来自此处).请注意,主 while 循环不会显式创建任何线程:

The following illustrate this (the code is originally from here). Note, the main while loop doesn't create any threads explicitly:

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                await task;
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static async Task Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        await new Program().StartListener();
    }
}

或者,代码可能如下所示,没有await Task.Yield().请注意,我将一个 async lambda 传递给 Task.Run,因为我仍然希望 HandleConnectionAsync 中的异步 API 中受益 并在其中使用 await :

Alternatively, the code might look like below, without await Task.Yield(). Note, I pass an async lambda to Task.Run, because I still want to benefit from async APIs inside HandleConnectionAsync and use await in there:

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}

更新,基于评论:如果这将是一个库代码,则执行环境确实未知,并且可能具有非默认同步上下文.在这种情况下,我宁愿在池线程(没有任何同步上下文)上运行主服务器循环:

Updated, based upon the comment: if this is going to be a library code, the execution environment is indeed unknown, and may have a non-default synchronization context. In this case, I'd rather run the main server loop on a pool thread (which is free of any synchronization context):

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            if (task.IsFaulted)
                await task;
        }
    });
}

这样,在 StartListener 中创建的所有子任务都不会受到客户端代码的同步上下文的影响.因此,我不必在任何地方显式调用 Task.ConfigureAwait(false).

This way, all child tasks created inside StartListener wouldn't be affected by the synchronization context of the client code. So, I wouldn't have to call Task.ConfigureAwait(false) anywhere explicitly.

更新 2020 年,有人在场外问了一个好问题:

Updated in 2020, someone just asked a good question off-site:

我想知道这里使用锁的原因是什么?这不是异常处理所必需的.我的理解是锁是使用是因为 List 不是线程安全的,因此真正的问题这就是为什么将任务添加到列表中的原因(并在加载).

I was wondering what is the reason for using a lock here? This is not necessary for exception handling. My understanding is that a lock is used because List is not thread safe, therefore the real question is why add the tasks to a list (and incur the cost of a lock under load).

由于 Task.Run 完全能够跟踪它的任务开始,我的想法是,在这个特定的例子中,锁是没用,不管你把它放在那里,因为在一个真正的程序中,有例如,列表中的任务允许我们迭代当前如果程序收到,则运行任务并干净地终止任务来自操作系统的终止信号.

Since Task.Run is perfectly able to keep track of the tasks it started, my thinking is that in this specific example the lock is useless, however you put it there because in a real program, having the tasks in a list allows us to for example, iterate currently running tasks and terminate the tasks cleanly if the program receives a termination signal from the operating system.

实际上,在现实生活中,我们几乎总是希望跟踪以 Task.Run(或任何其他 Task 对象,它们是飞行中"),原因如下:

Indeed, in a real-life scenario we almost always want to keep track of the tasks we start with Task.Run (or any other Task objects which are "in-flight"), for a few reasons:

  • 跟踪任务异常,否则可能会被默默吞下,如果在其他地方未被观察到.
  • 能够异步等待所有待处理任务的完成(例如,考虑启动/停止 UI 按钮或处理启动/停止无外设 Windows 服务内部的请求).
  • 为了能够控制(和节流/限制)我们允许同时进行的任务数量.
  • To track task exceptions, which otherwise might be silently swallowed if go unobserved elsewhere.
  • To be able to wait asynchronously for completion of all the pending tasks (e.g., consider a Start/Stop UI button or handling a request to start/stop a inside a headless Windows service).
  • To be able to control (and throttle/limit) the number of tasks we allow to be in-flight simultaneously.

有更好的机制来处理现实生活中的并发工作流(例如,TPL 数据流库),但我确实在此处包含了任务列表和锁定,即使在这个简单的示例中也是如此.使用即发即弃的方法可能很诱人,但这几乎从来都不是一个好主意.根据我自己的经验,当我确实想要一个即发即忘时,我使用了 async void 方法(检查 这个).

There are better mechanisms to handle a real-life concurrency workflows (e.g., TPL Dataflow Library), but I did include the tasks list and the lock on purpose here, even in this simple example. It might be tempting to use a fire-and-forget approach, but it's almost never is a good idea. In my own experience, when I did want a fire-and-forget, I used async void methods for that (check this).

这篇关于线程池服务器的异步/等待等价物是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆