Python 中的 ZeroMQ N 到 N 异步模式 [英] ZeroMQ N to N async pattern in Python

查看:42
本文介绍了Python 中的 ZeroMQ N 到 N 异步模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

类似于问题 ZeroMQ 中的 N 到 N 异步模式?,但遗憾的是从未收到包含工作代码的答案.

Similar to the question N to N async pattern in ZeroMQ?, but which unfortunately never received an answer with working code.

我正在尝试按照指南中的描述实施 Pub-Sub 网络:http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem(N-proxy-N 风格的小型消息代理).不幸的是,该指南没有提供任何代码示例.

I'm trying to implement Pub-Sub network as described in the guide: http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem (a small message broker in the style of N-proxy-N). Unfortunately, the guide doesn't provide any code examples.

我尝试使用 PyZMQ 实现一个 Hello World 示例,我想我已经接近了,但是我遇到了一些我不知道如何处理的错误.很抱歉使用 asyncio(我更喜欢这个然后线程).

I've tried to implement an Hello World example using PyZMQ, I think I'm close, but I'm facing some errors I don't know how to handle. Sorry for the use of asyncio (I'm more comfortable with this then threads).

"""Example using zmq to create a PubSub node_topic similar to a ROS topic"""
# Copyright (c) Stef van der Struijk <stefstruijk@protonmail.ch>.
# This example is in the public domain (CC-0)
# http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem

import asyncio
import zmq.asyncio
from zmq.asyncio import Context
import traceback
import logging

# N-proxy-M pattern: a subscriber which passes messages through a proxy through a publisher
class PubSubTopic:
    def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
        # get ZeroMQ version
        print("Current libzmq version is %s" % zmq.zmq_version())
        print("Current  pyzmq version is %s" % zmq.pyzmq_version())

        self.context = Context.instance()
        # 2 sockets, because we can only bind once to a socket (as opposed to connect)
        self.url1 = "tcp://{}:{}".format(address, port1)
        self.url2 = "tcp://{}:{}".format(address, port2)

        # start proxy, pubs and subs async; demonstration purpose only, probably better in separate threads
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.xpub_xsub_proxy(),  # commented out for different error
            self.pub_hello_world(),
            self.pub_hello_world(lang='jp'),
            self.sub_hello_world(),
            self.sub_hello_world(lang='jp'),
        ]))

    # N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
    async def xpub_xsub_proxy(self):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init proxy")

            # Socket subscribing to publishers
            frontend_pubs = self.context.socket(zmq.XSUB)
            frontend_pubs.bind(self.url1)

            # Socket publishing to subscribers
            backend_subs = self.context.socket(zmq.XPUB)
            backend_subs.bind(self.url2)

            print("Try: Proxy... CONNECT!")
            zmq.proxy(frontend_pubs, backend_subs)
            print("CONNECT successful!")

        except Exception as e:
            print("Error with proxy :(")
            # print(e)
            logging.error(traceback.format_exc())
            print()

    # test case: 2 pubs to 1 topic
    async def pub_hello_world(self, lang='en'):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init pub {}".format(lang))

            # connect, because many publishers - 1 subscriber
            pub = self.context.socket(zmq.PUB)
            pub.connect(self.url1)

            if lang == 'en':
                message = "Hello World"
                sleep = 1
            else:
                message = "Hello Sekai"  # Japanese
                sleep = 2

            # wait proxy and subs to b ready
            await asyncio.sleep(.5)

            # keep publishing "Hello World" / "Hello Sekai" messages
            print("Pub {}: Going to pub messages!".format(lang))
            while True:
                # publish message to topic 'world'
                # multipart: topic, message; async always needs `send_multipart()`?
                await pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
                print("Pub {}: Have send msg".format(lang))

                # slow down message publication
                await asyncio.sleep(sleep)

        except Exception as e:
            print("Error with pub {}".format(lang))
            # print(e)
            logging.error(traceback.format_exc())
            print()

    # test case: 2 subs to 1 topic
    async def sub_hello_world(self, lang='en'):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init sub {}".format(lang))

            # connect, because many subscribers - 1 (proxy) pub
            sub = self.context.socket(zmq.SUB)
            sub.connect(self.url2)
            # subscribe to topic 'en' or 'jp'
            sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))

            # wait proxy to be ready; necessary?
            await asyncio.sleep(.2)

            # keep listening to all published message, filtered on topic
            print("Sub {}: Going to wait for messages!".format(lang))
            while True:
                msg_received = await sub.recv_multipart()
                print("sub {}: {}".format(lang, msg_received))

        except Exception as e:
            print("Error with sub {}".format(lang))
            # print(e)
            logging.error(traceback.format_exc())
            print()


if __name__ == '__main__':
    PubSubTopic()

错误

代理错误

当我不注释掉代理函数时,我得到以下traceback

Errors

Proxy error

When I don't comment out the proxy function, I get the following traceback

python pub_sub_topic.py 
Current libzmq version is 4.2.2
Current  pyzmq version is 16.0.2
Init proxy
Try: Proxy... CONNECT!
^CTraceback (most recent call last):
  File "pub_sub_topic.py", line 139, in <module>
    PubSubTopic()
  File "pub_sub_topic.py", line 43, in __init__
    self.sub_hello_world(lang='jp'),
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1426, in _run_once
    handle._run()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/events.py", line 127, in _run
    self._callback(*self._args)
  File "pub_sub_topic.py", line 62, in xpub_xsub_proxy
    zmq.proxy(frontend_pubs, backend_subs)
  File "zmq/backend/cython/_device.pyx", line 95, in zmq.backend.cython._device.proxy (zmq/backend/cython/_device.c:1824)
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_device.c:1991)
KeyboardInterrupt

订阅者错误

如果我注释掉代理函数(# self.xpub_xsub_proxy(),),我会得到以下回溯

python pub_sub_topic.py 
Current libzmq version is 4.2.2
Current  pyzmq version is 16.0.2
Init sub en
Init sub jp
Init pub en
Init pub jp
Sub en: Going to wait for messages!
Error with sub en
ERROR:root:Traceback (most recent call last):
  File "pub_sub_topic.py", line 128, in sub_hello_world
    msg_received = await sub.recv_multipart()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 170, in recv_multipart
    dict(flags=flags, copy=copy, track=track)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 321, in _add_recv_event
    self._add_io_state(self._READ)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 294, in _add_io_state
    self.io_loop.add_reader(self, self._handle_recv)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 337, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
    key = self._selector.get_key(fd)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
    return mapping[fileobj]
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
    "{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>


Exception ignored in: <bound method Socket.__del__ of <zmq.asyncio.Socket object at 0x7fa90a4a7528>>
Traceback (most recent call last):
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 70, in __del__
    self.close()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 160, in close
    self._clear_io_state()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 316, in _clear_io_state
    self._drop_io_state(self._state)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 303, in _drop_io_state
    self.io_loop.remove_reader(self)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 342, in remove_reader
    return self._remove_reader(fd)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 279, in _remove_reader
    key = self._selector.get_key(fd)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
    return mapping[fileobj]
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
    "{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>
Sub jp: Going to wait for messages!

*snip* Same error as 'Sub en' *snip*

Pub en: Going to pub messages!
Pub en: Have send msg
Pub jp: Going to pub messages!
Pub jp: Have send msg
Pub en: Have send msg
Pub jp: Have send msg
Pub en: Have send msg
^CTraceback (most recent call last):
  File "pub_sub_topic.py", line 139, in <module>
    PubSubTopic()
  File "pub_sub_topic.py", line 43, in __init__
    self.sub_hello_world(lang='jp'),
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1390, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

系统信息

  • Ubuntu 16.04
  • Python 3.6(通过 Anaconda)
  • libzmq 4.2.2 版
  • pyzmq 16.0.2 版
  • 推荐答案

    一如既往,答案很简单.通过将其分成 3 个脚本,我们不必使用线程和异步编程,因此这应该可以帮助更多人.

    As always, the answer is in simplicity. By separating it into 3 scripts, we don't have to work with threads and asynchronous programming, so this should help more people.

    打开 6 个终端并在每个终端中运行以下命令:

    Open 6 terminals and run the following commands in a terminal each:

    1. python proxy_topic.py # 代理/ROS 主题
    2. python proxy_pub.py # 发布Hello World"
    3. python proxy_pub.py jp # 发布你好世界"
    4. python proxy_sub.py # 接收所有消息
    5. python proxy_sub.py en # 只接收Hello World";没必要
    6. python proxy_sub.py jp # 只接收Hello Sekai";没必要
    1. python proxy_topic.py # proxy / ROS topic
    2. python proxy_pub.py # publish "Hello World"
    3. python proxy_pub.py jp # publish "Hello Sekai"
    4. python proxy_sub.py # receive all messages
    5. python proxy_sub.py en # receive "Hello World" only; not necessary
    6. python proxy_sub.py jp # receive "Hello Sekai" only; not necessary

    proxy_topic.py

    import sys
    import zmq
    from zmq import Context
    
    
    class ProxyPub:
        def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
            # get ZeroMQ version
            print("Current libzmq version is %s" % zmq.zmq_version())
            print("Current  pyzmq version is %s" % zmq.pyzmq_version())
    
            self.context = Context.instance()
            # 2 sockets, because we can only bind once to a socket (as opposed to connect)
            self.url1 = "tcp://{}:{}".format(address, port1)
            self.url2 = "tcp://{}:{}".format(address, port2)
    
            self.xpub_xsub_proxy()
    
        # N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
        def xpub_xsub_proxy(self):
            print("Init proxy")
    
            # Socket subscribing to publishers
            frontend_pubs = self.context.socket(zmq.XSUB)
            frontend_pubs.bind(self.url1)
    
            # Socket publishing to subscribers
            backend_subs = self.context.socket(zmq.XPUB)
            backend_subs.bind(self.url2)
    
            print("Try: Proxy... CONNECT!")
            zmq.proxy(frontend_pubs, backend_subs)
            print("CONNECT successful!")
    
    
    if __name__ == '__main__':
        print("Arguments given: {}".format(sys.argv))
        ProxyPub()
    

    proxy_pub.py

    import sys
    import zmq
    from zmq import Context
    import time
    
    
    class ProxyPub:
        def __init__(self, lang='en', address='127.0.0.1', port='5566'):
            # get ZeroMQ version
            print("Current libzmq version is %s" % zmq.zmq_version())
            print("Current  pyzmq version is %s" % zmq.pyzmq_version())
    
            self.context = Context.instance()
            self.url = "tcp://{}:{}".format(address, port)
    
            self.pub_hello_world(lang)
    
        def pub_hello_world(self, lang):
            print("Init pub {}".format(lang))
    
            # connect, because many publishers - 1 subscriber
            pub = self.context.socket(zmq.PUB)
            pub.connect(self.url)
    
            if lang == 'en':
                message = "Hello World"
                sleep = 1
            else:
                message = "Hello Sekai"  # Japanese
                sleep = 2
    
            # wait proxy and subs to b ready
            time.sleep(.5)
    
            # keep publishing "Hello World" / "Hello Sekai" messages
            print("Pub {}: Going to pub messages!".format(lang))
            while True:
                # publish message to topic 'world'
                # multipart: topic, message; async always needs `send_multipart()`?
                pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
                print("Pub {}: Have send msg".format(lang))
    
                # slow down message publication
                time.sleep(sleep)
    
    
    if __name__ == '__main__':
        print("Arguments given: {}".format(sys.argv))
        if len(sys.argv) == 1:
            ProxyPub()
        elif len(sys.argv) == 2:
            ProxyPub(lang=sys.argv[1])
        else:
            print("Too many arguments")
    

    proxy_sub.py

    import sys
    import zmq
    from zmq import Context
    import time
    
    
    class ProxyPub:
        def __init__(self, lang='', address='127.0.0.1', port='5567'):
            # get ZeroMQ version
            print("Current libzmq version is %s" % zmq.zmq_version())
            print("Current  pyzmq version is %s" % zmq.pyzmq_version())
    
            self.context = Context.instance()
            self.url = "tcp://{}:{}".format(address, port)
    
            self.sub_hello_world(lang)
    
        def sub_hello_world(self, lang):
            print("Init sub {}".format(lang))
    
            # connect, because many subscribers - 1 (proxy) pub
            sub = self.context.socket(zmq.SUB)
            sub.connect(self.url)
            # subscribe to topic 'en' or 'jp'
            sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))
    
            # wait proxy to be ready; necessary?
            time.sleep(.2)
    
            # keep listening to all published message, filtered on topic
            print("Sub {}: Going to wait for messages!".format(lang))
            while True:
                msg_received = sub.recv_multipart()
                print("sub {}: {}".format(lang, msg_received))
    
    
    if __name__ == '__main__':
        print("Arguments given: {}".format(sys.argv))
        if len(sys.argv) == 1:
            ProxyPub()
        elif len(sys.argv) == 2:
            ProxyPub(lang=sys.argv[1])
        else:
            print("Too many arguments")
    

    这篇关于Python 中的 ZeroMQ N 到 N 异步模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆