Tornado TCP 服务器/客户端进程通信 [英] Tornado TCP Server / Client process communication

查看:32
本文介绍了Tornado TCP 服务器/客户端进程通信的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在多个 Tornado 进程之间建立通信,每个进程都充当网络服务器,即使用 tornado.web.RequestHandler.这个想法是我想要在进程之间建立一个完全网状的网络.我有 4 个进程,我想使用 tornado.tcpservertornado.tcpclient 在它们之间建立持续的永久通信:

I want to setup communication between a number of Tornado processes each acting as web-servers I.e. using tornado.web.RequestHandler. The idea is that i want a fully meshed network between the processes. I have 4 processes and I want to establish an ongoing permanent communication between them using the tornado.tcpserver and tornado.tcpclient:

T1---T2
| \  /| 
|  \/ |
| / \ |
T3---T4

我是 TCP 编程的新手,但是在我在 tornado 文档中看到的示例中:http://www.tornadoweb.org/en/stable/iostream.htmltornado.iostream.IOStream 类的 Implementations 下,一旦建立了套接字,所有通信就完成了,然后套接字关闭.该示例通过带有回调的块驱动代码,每个块都执行其通信职责.

I'm new to TCP programming however in the example I've seen in the tornado documentation: http://www.tornadoweb.org/en/stable/iostream.html Under Implementations for class tornado.iostream.IOStream once a socket is established all the communication is done and then then socket is closed. The example drives the code through blocks with callbacks each performing their duty of the communication.

但是,是否可以打开 TCP 连接并使 BaseIOStream.read_until_close() 空闲并仅在客户端写入服务器时调用?

However is it possible to open a TCP connection and have the BaseIOStream.read_until_close() idle and called only when the client writes to the server?

换句话说,客户端和服务器保持连接,当客户端写入服务器时,它会以某种方式中断 Tornado IOLoop 以调用 read()?

In other words the client and server stay connected and when the client writes to the server it somehow interrupts the Tornado IOLoop to call the read()?

或者是我的想法被误导了,这样做的方法是每次我需要进程进行通信时,我都会建立一个新的 TCP 连接,完成工作然后终止连接?似乎每次建立这个新连接都会包含很多开销,而不是让连接保持打开状态......

Or is my thinking misguided and the way to do this is every time I need the processes to communicate I establish a new TCP connection, do the work and then kill the connection? It just seems like establishing this new connection every time would contain a lot of overhead rather than leaving the connection open...

推荐答案

这是一个基本的实现.(我不能保证它的生产质量!)将它保存到一个文件并执行这样的操作,每个在不同的终端窗口中:

Here's a basic implementation. (I can't promise it's production-quality!) Save it to a file and execute something like this, each in a different terminal window:

> python myscript.py 10001 10002 10003
> python myscript.py 10002 10003 10001
> python myscript.py 10003 10001 10002

第一个参数是监听端口,其余参数是其他服务器的端口.

The first argument is the listening port, the remaining arguments are the ports of the other servers.

import argparse
import logging
import os
import random
import socket
import struct

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, StreamClosedError
from tornado.tcpclient import TCPClient
from tornado.tcpserver import TCPServer
from tornado.options import options as tornado_options


parser = argparse.ArgumentParser()
parser.add_argument("port", type=int, help="port to listen on")
parser.add_argument("peers", type=int, nargs="+", help="peers' ports")
opts = parser.parse_args()

# This is just to configure Tornado logging.
tornado_options.parse_command_line()
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)

# Cache this struct definition; important optimization.
int_struct = struct.Struct("<i")
_UNPACK_INT = int_struct.unpack
_PACK_INT = int_struct.pack

tcp_client = TCPClient()


@gen.coroutine
def client(port):
    while True:
        try:
            stream = yield tcp_client.connect('localhost', port)
            logging.info("Connected to %d", port)

            # Set TCP_NODELAY / disable Nagle's Algorithm.
            stream.set_nodelay(True)

            while True:
                msg = ("Hello from port %d" % opts.port).encode()
                length = _PACK_INT(len(msg))
                yield stream.write(length + msg)
                yield gen.sleep(random.random() * 10)

        except StreamClosedError as exc:
            logger.error("Error connecting to %d: %s", port, exc)
            yield gen.sleep(5)


loop = IOLoop.current()

for peer in opts.peers:
    loop.spawn_callback(client, peer)


class MyServer(TCPServer):
    @gen.coroutine
    def handle_stream(self, stream, address):
        logging.info("Connection from peer")
        try:
            while True:
                # Read 4 bytes.
                header = yield stream.read_bytes(4)

                # Convert from network order to int.
                length = _UNPACK_INT(header)[0]

                msg = yield stream.read_bytes(length)
                logger.info('"%s"' % msg.decode())

                del msg  # Dereference msg in case it's big.

        except StreamClosedError:
            logger.error("%s disconnected", address)


server = MyServer()
server.listen(opts.port)

loop.start()

请注意,我们没有调用 read_until_close,因此我们需要某种方式来知道何时完全接收到消息.我在每条消息的开头使用一个 32 位整数来执行此操作,该整数对消息其余部分的长度进行编码.

Notice that we don't call read_until_close, so we need some way to know when a message is completely received. I do this with a 32-bit integer at the beginning of each message which encodes the length of the rest of the message.

您问过,当客户端写入服务器时,它会以某种方式中断 Tornado IOLoop 以调用 read()?"这就是 Tornado 的 IOLoop 的用途,这就是我们所说的异步":许多 Tornado 协程或回调可以等待网络事件,当它们等待的事件发生时,IOLoop 将它们唤醒.这就是你在上面代码中看到yield"的地方发生的事情.

You asked, "when the client writes to the server it somehow interrupts the Tornado IOLoop to call the read()?" This is what Tornado's IOLoop is for, and it's what we mean by "async": many Tornado coroutines or callbacks can wait for network events, and the IOLoop wakes them when the events they're awaiting occur. That's what's happening wherever you see "yield" in the code above.

这篇关于Tornado TCP 服务器/客户端进程通信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆