零 mq:socket.recv() 调用被阻塞 [英] zero-mq: socket.recv() call is blocking

查看:29
本文介绍了零 mq:socket.recv() 调用被阻塞的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用零-mq.我的要求很简单.我希望能够在网络中的两个对等点之间进行通信.我在书中的示例中遇到了这个程序.

I am trying to use zero-mq.My requirement is very simple.I want to be able to communicate between two peers in a network.I came across this program in the examples in the book.

$ pub_server.py

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))
    time.sleep(1)

$sub_client.py

import sys
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print ('{} {}'.format(topic, messagedata))

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/update_nbr)))

我对这个模型的问题是

string = socket.recv()

阻塞直到我收到消息.我不希望这种情况发生.我希望消息在接收端排队,以便我可以将它从队列中取出(或类似的东西)

blocks till I recieve a message.I don't want this to happen.I want the messages to be queued up on the recieve side so that I can get it out of the queue (or something similar to this)

零 mq 中是否有一些模型允许这样做?

Is there some model in zero-mq that allows this?

推荐答案

zmq.Socket.recv 不会阻塞.

zmq.Socket.recv will not block if you pass the zmq.NOBLOCK flag parameter.

文档说:

If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready.

zmq 将接收到的消息排队,每次 recv() 调用都会返回一条消息,直到该队列耗尽,然后引发 ZMQError.

zmq will queue messages that it receives and one message will be returned for each recv() call until this queue is exhausted after which ZMQError is raised.

zmq.Again 下面的例子中使用的是zmq.EAGAIN 的包装器.

zmq.Again used in the exmaples below is a wrapper for zmq.EAGAIN.

例如:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

    # perform other important stuff
    time.sleep(10)

sub_client.py 示例也许可以编写为使用像这样的非阻塞行为:

The sub_client.py example could perhaps be written to use non-blocking behaviour like this:

import sys, time
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
received_value_count = 0
do_receive_loop = True
while do_receive_loop:
    try:
        #process all messages waiting on subscribe socket
        while True:
            #check for a message, this will not block
            string = socket.recv(flags=zmq.NOBLOCK)

            #message received, process it
            topic, messagedata = string.split()
            total_value += int(messagedata)
            print ('{} {}'.format(topic, messagedata))

            #check if we have all the messages we want
            received_value_count += 1
            if received_value_count > 4:
                do_receive_loop = False
                break

    except zmq.Again as e:
        #No messages waiting to be processed
        pass

    #Here we can do other stuff while waiting for messages
    #contemplate answer to 'The Last Question'
    time.sleep(15)
    print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER"

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))

这篇关于零 mq:socket.recv() 调用被阻塞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆