pyzmq recv_json 无法解码 send_json 发送的消息 [英] pyzmq recv_json can't decode message sent by send_json

查看:46
本文介绍了pyzmq recv_json 无法解码 send_json 发送的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的代码,去掉了无关的东西:

coordinator.py

context = zmq.Context()socket = context.socket(zmq.ROUTER)端口 = socket.bind_to_random_port(ZMQ_ADDRESS)轮询器 = zmq.Poller()poller.register(套接字,zmq.POLLIN)为真:事件 = poller.poll(1)如果不是事件:继续process_id, val = socket.recv_json()

worker.py

context = zmq.Context()socket = context.socket(zmq.DEALER)socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))socket.send_json((os.getpid(), 真))

当我运行它时会发生什么:

 process_id, val = socket.recv_json()文件/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py",第380行,在recv_json返回 jsonapi.loads(msg)文件/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py",第71行,加载返回 jsonmod.loads(s, **kwargs)文件/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py",第451行,加载返回 _default_decoder.decode(s)文件/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py",第406行,解码obj, end = self.raw_decode(s)文件/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py",第426行,raw_decoderaise JSONDecodeError("无法解码 JSON 对象", s, idx)JSONDecodeError:无法解码 JSON 对象:第 1 行第 0 列(字符 0)

如果我使用 ipdb:

>/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()第379话-->第380话381ipdb>留言'\x00\x9f\xd9\x06\xa2'

嗯,这看起来不像 JSON...这是 pyzmq 中的错误吗?我用错了吗?

解决方案

嗯,好的,找到答案了.

ØMQ 接口中存在令人讨厌的不对称性,因此您必须了解您正在使用的套接字类型.

在这种情况下,我使用 ROUTER/DEALER 架构意味着从 DEALER 套接字发送的 JSON 消息,当我执行 send_json 时,被包裹在 multipart 消息信封中.第一部分是客户端 ID(我猜这是我上面得到的 '\x00\x9f\xd9\x06\xa2'),第二部分是我们感兴趣的 JSON 字符串.

所以在我的 coordinator.py 的最后一行,我需要这样做:

id_, msg = socket.recv_multipart()process_id, val = json.loads(msg)

恕我直言,这是 ØMQ/pyzmq 的糟糕设计,库应该将其抽象出来,并且只有 sendrecv 方法,它们就可以工作.

我从这个问题中得到了线索怎么可能我将 send_json 与 pyzmq PUB SUB 一起使用 所以看起来 PUB/SUB 架构有同样的问题,毫无疑问其他人也是如此.

这在文档中有描述,但不是很清楚
http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern

更新

事实上,我发现在我的情况下,我可以通过直接使用消息信封的客户端 ID"部分来进一步简化代码.所以工人只是这样做:

context = zmq.Context()socket = context.socket(zmq.DEALER)socket.identity = str(os.getpid()) # 或者我可以省略它并使用ØMQ客户端IDsocket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))socket.send_json(真)

另外值得注意的是,当你想从 ROUTER 向另一个方向发送消息时,你必须将它作为多部分发送,指定它的目的地是哪个客户端,例如:

coordinator.py

context = zmq.Context()socket = context.socket(zmq.ROUTER)端口 = socket.bind_to_random_port(ZMQ_ADDRESS)轮询器 = zmq.Poller()poller.register(套接字,zmq.POLLIN)pids = set()为真:事件 = poller.poll(1)如果不是事件:继续process_id, val = socket.recv_json()pids.add(process_id)# 这里需要一些代码来决定什么时候停止监听# 并打破循环对于pids中的pid:socket.send_multipart([pid, 'a string message'])# ^ 如果需要,做你自己的 json 编码

我想可能有一些 ØMQ 方式来做广播消息,而不是像我上面那样循环发送到每个客户端.我希望文档对每种可用的套接字类型以及如何使用它们有一个清晰的描述.

Here is my code with the extraneous stuff stripped out:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()

worker.py

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(
    (os.getpid(), True)
)

what happens when I run it:

    process_id, val = socket.recv_json()
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
    return jsonapi.loads(msg)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
    return jsonmod.loads(s, **kwargs)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
    return _default_decoder.decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
    obj, end = self.raw_decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
    raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)

and if I dig in with ipdb:

> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
    379             msg = self.recv(flags)
--> 380             return jsonapi.loads(msg)
    381

ipdb> p msg
'\x00\x9f\xd9\x06\xa2'

hmm, that doesn't look like JSON... is this a bug in pyzmq? am I using it wrong?

解决方案

Hmm, ok, found the answer.

There is an annoying asymmetry in the ØMQ interface, so you have to be aware of the type of socket you are using.

In this case my use of ROUTER/DEALER architecture means that the JSON message sent from the DEALER socket, when I do send_json, gets wrapped in multipart message envelope. The first part is a client id (I guess this is the '\x00\x9f\xd9\x06\xa2' that I got above) and the second part is the JSON string we are interested in.

So in the last line of my coordinator.py I need to do this instead:

id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)

IMHO this is bad design on the part of ØMQ/pyzmq, the library should abstract this away and have just send and recv methods, that just work.

I got the clue from this question How can I use send_json with pyzmq PUB SUB so it looks like PUB/SUB architecture has the same issue, and no doubt others too.

This is described in the docs but it's not very clear
http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern

Update

In fact, I found in my case I could simplify the code further, by making use of the 'client id' part of the message envelope directly. So the worker just does:

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid())  # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(True)

It's also worth noting that when you want to send a message the other direction, from the ROUTER, you have to send it as multipart, specifying which client it is destined for, eg:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

pids = set()
while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()
    pids.add(process_id)

    # need some code in here to decide when to stop listening
    # and break the loop

for pid in pids:
    socket.send_multipart([pid, 'a string message'])
    # ^ do your own json encoding if required

I guess there is probably some ØMQ way of doing a broadcast message rather than sending to each client in a loop as I do above. I wish the docs just had a clear description of each available socket type and how to use them.

这篇关于pyzmq recv_json 无法解码 send_json 发送的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆