如何将 send_json 与 pyzmq PUB SUB 一起使用 [英] How can I use send_json with pyzmq PUB SUB

查看:33
本文介绍了如何将 send_json 与 pyzmq PUB SUB 一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将字典作为消息从发布者发送给订阅者.使用 REQ/REP 模式 send_json 和 recv_json 工作得很好,但我似乎找不到适用于 PUB/SUB 的咒语.希望不是 PUB/SUB 只能使用 send() 和 recv().

I need to send a dictionary as the message from a publisher to subscribers. With the REQ/REP pattern send_json and recv_json work nicely, but I can't seem to find an incantation that works for PUB/SUB. Hope it's not the case that PUB/SUB can only work with send() and recv().

这是我整理的实验清单:

Here's the listing for the experiment I put together:

"""
Experiments with 0MQ PUB/SUB pattern
"""
import os
import sys
import time
import zmq
from multiprocessing import Process
from random import sample, choice

import signal
def handler(signum, frame):
    """ Handler for SIGTERM """
    # kill the processes we've launched
    try:
        for name, proc in _procd.iteritems():
            if proc and proc.is_alive():
                proc.terminate()
    finally:            
        os._exit(0)

signal.signal(signal.SIGTERM, handler)  
PORT = 5566
TOPICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

def publisher():
    """ Randomly update and publish topics """
    context = zmq.Context()
    sock = context.socket(zmq.PUB)
    sock.bind("tcp://*:{}".format(PORT))
    ## Init a dict of topic:value pairs
    alltopics = dict()
    for char in TOPICS:
        alltopics[char] = time.time()

    while True:
        topic = choice(TOPICS)
        alltopics[topic] = time.time()
        ## THIS IS SENDING
        sock.send_json((topic, alltopics))
        print "Sent topic {}".format(topic)
        time.sleep(1)


def client(number, topics):
    """ 
    Subscribe to list of topics and wait for messages. 
    """
    context = zmq.Context()
    sock = context.socket(zmq.SUB)
    sock.connect("tcp://localhost:{}".format(PORT))
    for topic in topics:
        sock.setsockopt(zmq.SUBSCRIBE, topic)

    print "subscribed to topics {}".format(topics)    

    while True:
        ## THIS NEVER RETURNS
        print sock.recv_json() 

        ## ALREADY TRIED THIS. DOES NOT WORK  
        #topic, msg = sock.recv_json()
        #print  "Client{}  {}:{}".format(number, topic, msg[topic])

        sys.stdout.flush()

if __name__ == '__main__':
    _procd = dict()
    ## Launch publisher
    name = 'publisher'
    _procd[name] = Process(target=publisher, name=name)
    _procd[name].start()

    ## Launch the subscribers
    for n in range(10):
        name = 'client{}'.format(n)
        _procd[name] = Process(target=client,
                               name=name,
                               args=(n, sample(TOPICS,3)))
        _procd[name].start()



    ## Sleep until killed
    while True:
        time.sleep(1)    

这是我杀死父进程之前的输出

And here is the output up to the point where I kill the parent process

$ python pubsub.py
Sent topic Y
subscribed to topics ['B', 'Q', 'F']
subscribed to topics ['N', 'E', 'O']
subscribed to topics ['Y', 'G', 'M']
subscribed to topics ['G', 'D', 'I']
subscribed to topics ['D', 'Y', 'W']
subscribed to topics ['A', 'N', 'W']
subscribed to topics ['F', 'K', 'V']
subscribed to topics ['A', 'Q', 'X']
subscribed to topics ['S', 'Y', 'V']
subscribed to topics ['E', 'S', 'D']
Sent topic I
Sent topic N
Sent topic J
Sent topic I
Sent topic A
Sent topic T
Sent topic A
Sent topic K
Sent topic V
Sent topic E

订阅和发送似乎没问题,但客户端从不打印任何内容.客户端进程的回溯显示它们挂在 sock.recv_json() 调用上.我的第一次尝试被注释掉了.它也挂了.

The subscriptions and sending seems ok but the clients never print anything. The tracebacks for the client processes show that them hanging on the sock.recv_json() call. My first attempt is commented out. It also hangs.

推荐答案

我仍然希望看到它与 send_json() 和 recv_json() 一起工作,但是,根据 Jason 的建议,以下内容有效:

I'd still like to see it work with send_json() and recv_json() but, per Jason's suggestion, the following is working:

def mogrify(topic, msg):
    """ json encode the message and prepend the topic """
    return topic + ' ' + json.dumps(msg)

def demogrify(topicmsg):
    """ Inverse of mogrify() """
    json0 = topicmsg.find('{')
    topic = topicmsg[0:json0].strip()
    msg = json.loads(topicmsg[json0:])
    return topic, msg 

在publisher()中使用

with this in publisher()

    sock.send(mogrify(topic, alltopics))

这在 client()

and this in client()

   topic, msg = demogrify(sock.recv())

以下是完整的清单,后面是一些示例输出:

Here's the complete listing followed by some sample output:

#!/usr/bin/env python
# coding: utf8 
"""
Experiments with 0MQ PUB/SUB pattern.
Creates a publisher with 26 topics (A, B, ... Z) and
spawns clients that randomly subscribe to a subset
of the available topics. Console output shows 
who subscribed to what, when topic updates are sent
and when clients receive the messages.
Runs until killed.
Author: Michael Ellis
License: WTFPL
"""
import os
import sys
import time
import zmq
from multiprocessing import Process
from random import sample, choice
import json

PORT = 5566
TOPICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" # split into ['A', 'B', ... ]
PUBSLEEP = 0.01 # Sleep time at bottom of publisher() loop.
NCLIENTS = 10  # Number of clients spawned.
NSUBS = 3  # Number of topics each client subscribes to.

assert NSUBS <= len(TOPICS)

def mogrify(topic, msg):
    """ json encode the message and prepend the topic """
    return topic + ' ' + json.dumps(msg)

def demogrify(topicmsg):
    """ Inverse of mogrify() """
    json0 = topicmsg.find('{')
    topic = topicmsg[0:json0].strip()
    msg = json.loads(topicmsg[json0:])
    return topic, msg

def publisher():
    """ Randomly update and publish topics """
    context = zmq.Context()
    sock = context.socket(zmq.PUB)
    sock.bind("tcp://*:{}".format(PORT))
    ## Init a dict of topic:value pairs
    alltopics = dict()
    for char in TOPICS:
        alltopics[char] = time.time()

    while True:
        try:
            topic = choice(TOPICS)
            alltopics[topic] = time.time()
            sock.send(mogrify(topic, alltopics))
            print "Sent topic {}".format(topic)
            time.sleep(PUBSLEEP)
        except KeyboardInterrupt:
            sys.exit()

def client(number, topics):
    """ 
    Subscribe to list of topics and wait for messages. 
    """
    context = zmq.Context()
    sock = context.socket(zmq.SUB)
    sock.connect("tcp://localhost:{}".format(PORT))
    for topic in topics:
        sock.setsockopt(zmq.SUBSCRIBE, topic)

    print "subscribed to topics {}".format(topics)    

    while True:
        try:
            topic, msg = demogrify(sock.recv())
            print  "Client{}  {}:{}".format(number, topic, msg[topic])
            sys.stdout.flush()
        except KeyboardInterrupt:
            sys.exit()

_procd = dict()
def run():
    """ Spawn publisher and clients. Loop until terminated. """
    ## Launch publisher
    name = 'publisher'
    _procd[name] = Process(target=publisher, name=name)
    _procd[name].start()

    ## Launch the subscribers
    for n in range(NCLIENTS):
        name = 'client{}'.format(n)
        _procd[name] = Process(target=client,
                               name=name,
                               args=(n, sample(TOPICS, NSUBS)))
        _procd[name].start()


    ## Sleep until killed
    while True:
        time.sleep(1)          

if __name__ == '__main__':
    import signal
    def handler(signum, frame):
        """ Handler for SIGTERM """
        # kill the processes we've launched
        try:
            for _, proc in _procd.iteritems():
                if proc and proc.is_alive():
                    proc.terminate()
        finally:            
            sys.exit()

    signal.signal(signal.SIGTERM, handler)

    run()

样本输出

$ pubsub.py 
Sent topic Q
subscribed to topics ['R', 'G', 'S']
subscribed to topics ['J', 'K', 'C']
subscribed to topics ['L', 'B', 'P']
subscribed to topics ['X', 'Z', 'A']
subscribed to topics ['K', 'O', 'R']
subscribed to topics ['J', 'Z', 'T']
subscribed to topics ['R', 'G', 'P']
subscribed to topics ['Y', 'A', 'O']
subscribed to topics ['U', 'S', 'C']
subscribed to topics ['B', 'P', 'L']
Sent topic U
Client8  U:1407506576.27
Sent topic E
Sent topic A
Client3  A:1407506576.29
Client7  A:1407506576.29
Sent topic A
Client3  A:1407506576.31
Client7  A:1407506576.31
Sent topic G
Client0  G:1407506576.32
Client6  G:1407506576.32
Sent topic E
Sent topic B
Client2  B:1407506576.34
Client9  B:1407506576.34
Sent topic R
Client0  R:1407506576.35
Client6  R:1407506576.35
Client4  R:1407506576.35
Sent topic U
Client8  U:1407506576.36
...

这篇关于如何将 send_json 与 pyzmq PUB SUB 一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆