Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World [英] Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World
问题描述
我刚刚开始使用 ZeroMQ,我正在尝试让 Hello World 与 Python 3.6 中的 PyZMQ 和 asyncio 一起工作.我正在尝试将模块的功能与发布/订阅代码分离,因此有以下类设置:
I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:
编辑 1:最小化示例
编辑 2:包含解决方案,请参阅下面的答案以了解操作方法.
Edit 2: Included solution, see answer down for how.
import asyncio
import zmq.asyncio
from zmq.asyncio import Context
# manages message flow between publishers and subscribers
class HelloWorldMessage:
def __init__(self, url='127.0.0.1', port='5555'):
self.url = "tcp://{}:{}".format(url, port)
self.ctx = Context.instance()
# activate publishers / subscribers
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.pub_hello_world(),
self.sub_hello_world(),
]))
# generates message "Hello World" and publish to topic 'world'
async def pub_hello_world(self):
pub = self.ctx.socket(zmq.PUB)
pub.connect(self.url)
# message contents
msg = "Hello World"
print(msg)
# keep sending messages
while True:
# --MOVED-- slow down message publication
await asyncio.sleep(1)
# publish message to topic 'world'
# async always needs `send_multipart()`
await pub.send_multipart([b'world', msg.encode('ascii')]) # WRONG: bytes(msg)
# processes message "Hello World" from topic 'world'
async def sub_hello_world(self):
sub = self.ctx.socket(zmq.SUB)
sub.bind(self.url)
sub.setsockopt(zmq.SUBSCRIBE, b'world')
# keep listening to all published message on topic 'world'
while True:
msg = await sub.recv_multipart()
# ERROR: WAITS FOREVER
print('received: ', msg)
if __name__ == '__main__':
HelloWorldMessage()
问题
上面的代码只打印了 1 个 Hello World
然后一直等待.如果我按 ctrl+c,则会出现以下错误:
Problem
With the above code only 1 Hello World
is printed and then waits forever. If I press ctrl+c, I get the following error:
python helloworld_pubsub.py
Hello World
^CTraceback (most recent call last):
File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
HelloWorldMessage()
File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
self.sub_hello_world(),
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
event_list = self._selector.select(timeout)
File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
版本:libzmq:4.2.3
、pyzmq:17.0.0
、Ubuntu 16.04
感谢任何见解.
推荐答案
我的代码有 2 个错误:
There were 2 errors with my code:
- 正如@user3666197 所提到的,
PUB/SUB
通信原型需要一些时间初始化(见他/她的回答).我不得不将await asyncio.sleep(1)
移到发布代码上方 (await pub.send_multipart([b'world', msg.encode('ascii')])代码>)
- 我对消息进行了错误编码.
bytes(msg)
-->msg.encode('ascii')
- As mentioned by @user3666197, the
PUB/SUB
communication archetype needs some time for initialization (see his/her answer). I had to moveawait asyncio.sleep(1)
above the code of publishing (await pub.send_multipart([b'world', msg.encode('ascii')])
) - I encoded the message wrong.
bytes(msg)
-->msg.encode('ascii')
此答案与我的问题最相关,但在实现 PyZMQ 时,请查看 @user3666197 以了解某些设计选择.
This answer is most closely related to my question, but please look at @user3666197 for certain design choices when implementing PyZMQ.
asyncio.get_event_loop()
中的 PyZMQ 似乎没有给出错误回溯,因此,将您的代码包装在 try
&except
块,例如:
It seems that PyZMQ in an asyncio.get_event_loop()
doesn't give an error traceback, therefore, wrap your code in a try
& except
block, e.g.:
import traceback
import logging
try:
while True:
msg_received = await sub.recv_multipart()
# do other stuff
except Exception as e:
print("Error with sub world")
logging.error(traceback.format_exc())
这篇关于Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!