Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World [英] Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World

查看:62
本文介绍了Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始使用 ZeroMQ,我正在尝试让 Hello World 与 Python 3.6 中的 PyZMQ 和 asyncio 一起工作.我正在尝试将模块的功能与发布/订阅代码分离,因此有以下类设置:

I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:

编辑 1:最小化示例

编辑 2:包含解决方案,请参阅下面的答案以了解操作方法.

Edit 2: Included solution, see answer down for how.

import asyncio
import zmq.asyncio
from zmq.asyncio import Context

# manages message flow between publishers and subscribers
class HelloWorldMessage:
    def __init__(self, url='127.0.0.1', port='5555'):
        self.url = "tcp://{}:{}".format(url, port)
        self.ctx = Context.instance()

        # activate publishers / subscribers
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.pub_hello_world(),
            self.sub_hello_world(),
        ]))

    # generates message "Hello World" and publish to topic 'world'
    async def pub_hello_world(self):
        pub = self.ctx.socket(zmq.PUB)
        pub.connect(self.url)

        # message contents
        msg = "Hello World"
        print(msg)

        # keep sending messages
        while True:
            # --MOVED-- slow down message publication
            await asyncio.sleep(1) 

            # publish message to topic 'world'
            # async always needs `send_multipart()`
            await pub.send_multipart([b'world', msg.encode('ascii')])  # WRONG: bytes(msg)

    # processes message "Hello World" from topic 'world'
    async def sub_hello_world(self):
        sub = self.ctx.socket(zmq.SUB)
        sub.bind(self.url)
        sub.setsockopt(zmq.SUBSCRIBE, b'world')

        # keep listening to all published message on topic 'world'
        while True:
            msg = await sub.recv_multipart()
            # ERROR: WAITS FOREVER
            print('received: ', msg)

if __name__ == '__main__':
    HelloWorldMessage()

问题

上面的代码只打印了 1 个 Hello World 然后一直等待.如果我按 ctrl+c,则会出现以下错误:

Problem

With the above code only 1 Hello World is printed and then waits forever. If I press ctrl+c, I get the following error:

python helloworld_pubsub.py

Hello World
^CTraceback (most recent call last):
  File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
    HelloWorldMessage()
  File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
    self.sub_hello_world(),
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
    event_list = self._selector.select(timeout)
  File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

版本:libzmq:4.2.3pyzmq:17.0.0Ubuntu 16.04

感谢任何见解.

推荐答案

我的代码有 2 个错误:

There were 2 errors with my code:

  1. 正如@user3666197 所提到的,PUB/SUB 通信原型需要一些时间初始化(见他/她的回答).我不得不将 await asyncio.sleep(1) 移到发布代码上方 (await pub.send_multipart([b'world', msg.encode('ascii')]))
  2. 我对消息进行了错误编码.bytes(msg) --> msg.encode('ascii')
  1. As mentioned by @user3666197, the PUB/SUB communication archetype needs some time for initialization (see his/her answer). I had to move await asyncio.sleep(1) above the code of publishing (await pub.send_multipart([b'world', msg.encode('ascii')]))
  2. I encoded the message wrong. bytes(msg) --> msg.encode('ascii')

此答案与我的问题最相关,但在实现 PyZMQ 时,请查看 @user3666197 以了解某些设计选择.

This answer is most closely related to my question, but please look at @user3666197 for certain design choices when implementing PyZMQ.

asyncio.get_event_loop() 中的 PyZMQ 似乎没有给出错误回溯,因此,将您的代码包装在 try &except 块,例如:

It seems that PyZMQ in an asyncio.get_event_loop() doesn't give an error traceback, therefore, wrap your code in a try & except block, e.g.:

import traceback
import logging

try:
    while True:
        msg_received = await sub.recv_multipart()
        # do other stuff

except Exception as e:
    print("Error with sub world")
    logging.error(traceback.format_exc())

这篇关于Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆