使用asyncio时无法使用os.fork()将多个进程绑定到一个套接字服务器 [英] Could not use os.fork() bind several process to one socket server when using asyncio

查看:135
本文介绍了使用asyncio时无法使用os.fork()将多个进程绑定到一个套接字服务器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们都知道使用asyncio可以大大提高套接字服务器的性能,而且如果我们可以利用cpu中的所有内核(可能通过多处理模块或os.fork()等),显然事情会变得更加糟糕 >

我现在正在尝试构建一个多核套接字服务器演示,其中一个异步套接字服务器在每个内核上侦听并将所有绑定到一个端口.只需创建一个异步服务器,然后使用os.fork(),就可以使进程竞争性地工作.

但是,当我尝试进行分叉时,单核精细代码会遇到一些麻烦.在epoll选择器模块中注册来自不同进程的相同文件描述符似乎有些问题.

我在下面显示一些代码,有人可以帮我吗?


这是使用asyncio的简单,逻辑清晰的回显服务器代码:

 import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

 

在我尝试进行分叉,套接字启动后以及服务器开始服务之前,它都可以正常工作. (此逻辑在基于同步的线程代码中正常工作.)


当我尝试此操作时:

 loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()
 

从理论上讲,分叉的进程是否在同一个套接字上?并在同一事件循环中运行?然后就可以了吗?

但是我收到这些错误消息:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python版本3.7.3,

我对发生的事情完全感到困惑.

有人可以帮忙吗?谢谢

解决方案

根据跟踪器问题 ,不支持派生一个现有的asyncio事件循环并尝试在多个进程中使用它.但是,根据 Yury的评论,可以在开始处理之前通过分叉来实现多处理.循环,因此在每个孩子中运行完全独立的异步循环.

您的代码实际上确认了这种可能性:虽然create_serverasync def,但它不会等待任何东西,也不会使用loop参数.因此,我们可以通过以下方式来实现Yury的方法:将create_server设为常规函数,删除loop参数,并在os.fork()之前调用它,然后仅在派生之后运行事件循环:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()

We all know that using asyncio substantially improves the performance of a socket server, and obviously things get even more awesome if we could take advantage of all cores in our cpu (maybe via multiprocessing module or os.fork() etc.)

I'm now trying to build a multicore socket server demo, with a asynchronous socket server listening on each core and all binding to one port. Simply by creating a async server and then use os.fork(), let processes work competitively.

However the single-core-fine code runs into some trouble when I'm trying to fork. Seems like there's some problem with registering same filedescriptors from different processes in epoll selector module.

I'm showing some code below, can anyone help me out?


Here's a simple, logically clear code of echo server using asyncio:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

It works fine until I'm trying to fork, after the socket was bounl and before server starts serving. (This logic works fine in synchronous -- threading based code.)


When I'm trying this:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

Theoretically forked process are bounl to a same socket? And run in a same event loop? then work just fine?

However I'm getting these error messages:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python version 3.7.3,

I'm totally confused about what's going on.

Could anybody help? thanks

解决方案

According to the tracker issue, it is not supported to fork an existing asyncio event loop and attempt to use it from multiple processes. However, according to Yury's comment on the same issue, multi-processing can be implemented by forking before starting a loop, therefore running fully independent asyncio loops in each child.

Your code actually confirms this possibility: while create_server is async def, it doesn't await anything, nor does it use the loop argument. So we can implement Yury's approach by by making create_server a regular function, removing the loop argument, and calling it before os.fork(), and only running event loops after forking:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()

这篇关于使用asyncio时无法使用os.fork()将多个进程绑定到一个套接字服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆