Python Asyncio队列获取未收到消息 [英] Python Asyncio queue get doesn't receive the message

查看:96
本文介绍了Python Asyncio队列获取未收到消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我发布了一个有关旧的的新问题,从队列获取问题.这是代码(感谢Martijn Pieters)

I post a new question related the old for a problem with the get from queue. This is the code (thanks to Martijn Pieters)

import asyncio
import sys
import json
import os
import websockets


async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)
        file = open(r"/home/host/Desktop/FromSocket.txt", "a")
        file.write("From socket: " + ascii(message) + "\n")
        file.close()


async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        file = open(r"/home/host/Desktop/ToSocket.txt", "a")
        file.write("To socket: " + ascii(message) + "\n")
        file.close()
        await socket.send(message)


async def connect_socket(incoming, outgoing, loop=None):
    header = {"Authorization": r"Basic XXX="}
    uri = 'XXXXXX'
    async with websockets.connect(uri, extra_headers=header) as web_socket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.ensure_future(
            socket_consumer(web_socket, outgoing), loop=loop)
        producer_task = asyncio.ensure_future(
            socket_producer(web_socket, incoming), loop=loop)

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
        for task in pending:
            task.cancel()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
    writer = asyncio.streams.StreamWriter(
        writer_transport, writer_protocol, None, loop)

    return reader, writer


async def pipe_consumer(pipe_reader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipe_reader.readline()
        if not message:
            break
        file = open(r"/home/host/Desktop/FromPipe.txt", "a")
        file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
        file.close()

        await outgoing.put(message.decode('utf8'))


async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        file = open(r"/home/host/Desktop/ToPipe.txt", "a")
        file.write("Send to pipe message: " + ascii(json_message) + "\n")
        file.close()
        try:
            message = json.loads(json_message)
            message_type = int(message.get('header', {}).get('messageID', -1))

        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            message_type = None
            file = open(r"/home/host/Desktop/Error.txt", "a")
            file.write(" Error \n")
            file.close()
        # 1 is DENM message, 2 is CAM message
        file.write("Send to pipe type: " + type)
        if message_type in {1, 2}:
            file.write("Send to pipe: " + json_message)
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()


async def connect_pipe(incoming, outgoing, loop=None):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.ensure_future(
        pipe_consumer(reader, outgoing), loop=loop)
    producer_task = asyncio.ensure_future(
        pipe_producer(writer, incoming), loop=loop)

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()


def main():
    loop = asyncio.get_event_loop()
    pipe_to_socket = asyncio.Queue(loop=loop)
    socket_to_pipe = asyncio.Queue(loop=loop)

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)

    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))

main()

此代码是父进程通过

subprocess.Popen(["python3", test], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

问题是对象通过 socket_consumer (从套接字接收)在队列中,但是 pipe_producer 却没有从 incoming.get().编写文件仅用于测试目的.

The problem is the object is in the queue by socket_consumer (received from the socket) but the pipe_producer doesn't go ahead from incoming.get(). The file writing is only for testing purpose.

此刻的父母是这个(仅用于测试)

The parent at the moment is this (only for test)

test = r"/home/host/PycharmProjects/Tim/Tim.py"
process = subprocess.Popen(["python3", test],
                           stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
    jsonValueBytes = message.encode("utf-8")
    process.stdin.write(jsonValueBytes + b"\n")

process.stdin.close()
process.wait()

我使用以下代码代替发送到Web套接字:

Instead to send to web socket I'm using this code:

#!/usr/bin/env python

import asyncio
import websockets

async def hello(uri):
    header = {"Authorization": r"Basic XXXX="}
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5,"referencePosition":{"latitude":451114425,"longitude":76720957,"positionConfidenceEllipse":{"semiMajorConfidence":4095,"semiMinorConfidence":4095,"semiMajorOrientation":3601},...other fields}}';
    async with websockets.connect(uri, extra_headers=header) as websocket:
        await websocket.send(message)


asyncio.get_event_loop().run_until_complete(
    hello('XXX'))

它通过管道发送并起作用,因为我在管道上接收并发送到套接字(文件FromPipe.txt.和ToSocket.txt是正确的).
然后,我将代码发送到带有打开的Web套接字的服务器,然后该服务器将消息发送给孩子.当孩子从套接字接收到文件FromSocket.txt时,将创建该文件,但不会创建ToPipe.txt,直到我将其放在 awit incoming.get()

It send through pipe and works because I receive on pipe and send to the socket (the files FromPipe.txt. and ToSocket.txt are right).
Then I have code to send to a server with an opened web socket and this server send the message to the child. When the child receive from the socket the file FromSocket.txt is created but the ToPipe.txt in not created until I put it before the awit incoming.get()

FromSocket.txt 具有以下内容:

From socket: '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1, ... other field}}'

但是如果类型检索存在问题,那么它将创建该文件,因为它是 json_message = await entry.get()之后的第一条指令我认为队列有问题.为了进行测试,我在等待 outgoing.put(message)之后将 incoming.get()放入socket_consumer中,并且可以正常工作.

But if there was a problem on type retrieving than it would create the file since it is the first instruction after the json_message = await incoming.get() I think is a problem with the queue. For test I put the incoming.get() in socket_consumer after the await outgoing.put(message) and it works.

更新:如果我仅运行子级(因此不使用管道),则ToPipe.txt是正确的,并且从套接字传输到管道的消息很好.在我的测试中,我运行父级,它将父级发送给管道的一条消息,然后将消息发送至套接字,然后子级捕获该消息,但未发送至管道和ToPipe.txt未创建.可能主要方法有问题

UPDATE: If I run only the child (so without pipe) the ToPipe.txt is right and the message transfert from socket to pipe is fine. For my test I run the parent, it sends to pipe one message that the child sends to the socket, then I send a message to the socket and the child catchs this message but it doesn't send to the pipe and the ToPipe.txt is not created. Maybe there is a problem in the main method

推荐答案

您正在将双重编码的JSON写入子进程:

You are writing double-encoded JSON to the child process:

message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
jsonValue = json.dumps(message)

消息已经是一个JSON字符串,所以 jsonValue 是一个经过双重编码的JSON字符串.

message is already a JSON string, so jsonValue is a double-encoded JSON string.

管道使用者将这个双重编码的字符串推送到套接字的队列中.接下来,中的websocket生产者对消息再次进行编码:

The pipe consumer pushes this double-encoded string into the queue for the socket. Next, the websocket producer in socket_producer() encodes the message again:

while True:
    message = await incoming.get()
    # ...
    json_message = json.dumps(message)
    await socket.send(json_message)

因此,现在 json_message 是一个三重编码的JSON值,即包含JSON文档的JSON文档,其中JSON文档包含JSON文档:

So now json_message is a triple-encoded JSON value, a JSON document containing a JSON document containing a JSON document:

>>> import json
>>> message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'  # valid JSON
>>> json_message = json.dumps(message)
>>> print(json_message)  # double-encoded
"{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"
>>> json_message = json.dumps(json_message)  # encode *again*
>>> print(json_message)  # triple-encoded
"\"{\\\"header\\\":{\\\"protocolVersion\\\":1,\\\"messageID\\\":2,\\\"stationID\\\":400}}}\""

我不知道您的Web套接字到底是做什么的,但是假设它一次使用 json.loads(),然后回显解码后的消息.这意味着 socket_consumer()会收到一个仅编码两次的JSON文档.您的 FromSocket.txt 日志肯定暗示发生了这种情况,因为它包含 double 编码的JSON消息:

I don't know exactly what your web socket does with this, but lets assume that it uses json.loads() once, then echoes the decoded message back. This means that socket_consumer() receives the a JSON document that's encoded just twice. Your FromSocket.txt log certainly implies that this is what happens, because it contains a double encoded JSON message:

您可以在 FromSocket.txt 日志中看到此内容:

You can see this in your FromSocket.txt log:

From socket: "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400},\"cam\":{\"generationDeltaTime\":1,...other fields}}"

请注意那些 \"条目,整个文档都用引号引起来,但是值中没有 \\\ 三重反斜杠.

Note those \" entries, and the whole document is wrapped in quotes, but there are no \\\ triple-backslashes in the value.

不过,这种额外的JSON编码分层打破了 pipe_producer()协程,协程希望协程将消息解码为字典,而不是其他字符串(即使该字符串包含另一个JSON文档):/p>

Still, this extra layering of JSON encoding breaks the pipe_producer() coroutine, which expects the message to decode to a dictionary, not another string (even if that string contains another JSON document):

message = json.loads(json_message)
type = int(message.get('header', {}).get('messageID', -1))

message 将改为解码为字符串,因此 message.get 将失败,并出现 AttributeError ,导致协程退出:

message will decode to a string instead, so message.get will fail with an AttributeError, causing the coroutine to exit:

>>> json_message = "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"  # double encoded
>>> message = json.loads(json_message)
>>> message  # Back one stop, single-encoded JSON
'{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'
>>> type(message)  # it's a string with JSON, not a dictionary
<class 'str'>
>>> message.get('header')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'str' object has no attribute 'get'

您需要确保不会对数据进行太多编码!如果您的管道接收到JSON数据,则在将其发送到套接字时,不要再次对数据进行编码.从父进程将数据发送到管道时,请不要对数据进行双重编码,如果您已经有一个JSON字符串,则通过 json.dumps().

You need to make sure you do not encode your data too many times! If your pipe receives JSON data, don't encode the data again when sending it to the socket. When sending data to the pipe from the parent process, don't double encode the data, if you already have a JSON string there is no value in passing it through json.dumps() once more.

在协程中添加更多的故障保护也是谨慎的.我没有使JSON解码足够健壮,因此请补救这一部分:

It would also be prudent to add more fail-safes in the coroutines. I didn't make the JSON decoding robust enough, so lets remedy that part:

async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        try:
            message = json.loads(json_message)
            type = int(message.get('header', {}).get('messageID', -1))
        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            type = None
        # 1 is DENM message, 2 is CAM message
        if type in {1, 2}:
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()

您可能想记录解码在某处失败(将消息推送到日志队列,然后由一个单独的任务将其写入日志).

You probably want to record that the decoding failed somewhere (push messages to a log queue that a separate task picks up to write to a log).

接下来,我们可以更新 connect _ * 函数以不忽略完成任务中的异常:

Next, we can update the connect_* functions to not ignore exceptions in the tasks that complete:

done, pending = await asyncio.wait(
    [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
    task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
    task.result()

done.result()检查可以重新引发在消费者或生产者中引发的异常.由于 connect _ * 协程通过 asyncio.gather()运行,而反过来又由 loop.run_until_complete()运行,因此该异常是然后一直传播到 main()函数,因此它将退出Python,您将看到打印的回溯.我已经更新了其他答案,以便在完成时将 for任务包括在内:task.result()循环,因为无论如何这都是好习惯.

The done.result() check can re-raise the exception thrown in a consumer or producer. Since the connect_* coroutines are run via asyncio.gather(), which in turn is run by loop.run_until_complete(), that exception is then propagated all the way to the main() function, so it'll exit Python and you get to see the traceback printed. I've updated my other answer to include the for task in done: task.result() loop as that's good practice anyway.

使用 just task.result()循环到我的原始答案代码中,并使用一个websocket只是将消息回显,并输入有效的JSON文档(而不是双重编码),我可以立即看到错误;父进程是我的终端,所以我只是将JSON消息复制到我的终端窗口中,以将数据发送到管道中:

With just the task.result() loop in my original answer code, and a websocket that just echoes the message back, and entering a valid JSON document (not double-encoded), I can see the error immediately; the parent process here is my terminal, so I'm just copying in the JSON message into my terminal window to send data into the pipe:

$ python3.7 so52291218.py
{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
Traceback (most recent call last):
  File "so52291218.py", line 140, in <module>
    main()
  File "so52291218.py", line 137, in main
    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
  File "/.../lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "so52291218.py", line 126, in connect_pipe
    task.result()
  File "so52291218.py", line 104, in pipe_producer
    type = int(message.get("header", {}).get("messageID", -1))
AttributeError: 'str' object has no attribute 'get'

当我从 socket_producer() 中删除 json.dumps()调用时,我将Websocket服务器更改为使用传入消息上的json.loads()并将其作为结果发送出去,然后代码工作,并且我得到的消息也回传到我的终端.

When I remove the json.dumps() call from the socket_producer() or I change my websocket server to use json.loads() on the incoming message and sending that out as the result, then the code works and I get the same message echoed back to my terminal.

请注意,当 stdin stdout 是管道.您可以通过仅编写循环来平凡地使子进程挂在I/O上.您还必须确保也从 stdout 管道读取,但是由于子进程将以有效的随机顺序从这些句柄进行读写,因此您的父进程也会必须异步处理 Popen()管道的I/O.

Note that you can't just use a loop to write to a subprocess.Popen() pipe when both stdin and stdout are pipes. You can trivially cause the childprocess to hang on I/O by only writing in a loop. You'd have to make sure to read from the stdout pipe too, but because the child process is going to read and write from those handles in effectively random order, your parent process would also have to handle the I/O for the Popen() pipes asynchronously.

我没有告诉您如何做(在Stack Overflow的其他地方已经介绍过),而是告诉您使用已经为您完成了所有工作(通过产生一个独立的线程,该线程不断从 stdout 管道读取);使用 pexpect.popen_spawn.PopenSpawn()使其与原始设置保持相似,如下所示:

Rather than write up how to do that (which is already covered elsewhere on Stack Overflow), I'm instead telling you to use the pexpect project, as it already has done all that work for you (by spawning a separate thread that continually reads from the stdout pipe); using pexpect.popen_spawn.PopenSpawn() to keep this close to your original setup would look like this:

import sys
import pexpect

test = '...'
process = pexpect.popen_spawn.PopenSpawn([sys.executable, test])

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}';
    jsonValueBytes = message.encode("utf-8")
    process.send(jsonValueBytes + b"\n")

    # echo anything coming back
    while True:
        index = process.expect([process.crlf, pexpect.EOF, pexpect.TIMEOUT], timeout=0.1)
        if not process.before:
            break
        print('>>>', process.before.decode('utf8', errors='replace'), flush=True)

# send EOF to close the pipe, then terminate the process
process.sendeof()
process.kill(1)
process.wait()

因此,每当我们向管道发送整行时,我们还会寻找超时时间短的相反方向的行,并回显任何这样的行.

So every time we send a full line to the pipe, we also look for lines coming the other way, with a short timeout, and echo any such lines.

已完成所有修复(确保避免对JSON消息进行多次编码),并且非常简单地响应websocket服务器,上面的 pexpect 代码将显示:

With all fixes in place (making sure to avoid multiple-encoding JSON messages), and a very simple echoing websocket server, the pexpect code above prints:

>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}

表明从父进程到子进程再到websocket并返回都有完整的往返路径.

Showing that there is a full round-trip path from parent process to child process to websocket and back.

这篇关于Python Asyncio队列获取未收到消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆