使用StreamReader异步解码utf-8 [英] Asyncio decode utf-8 with StreamReader

查看:207
本文介绍了使用StreamReader异步解码utf-8的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经习惯了asyncio并发现任务处理非常好,但是将async库与传统io库混合起来可能很困难.我目前面临的问题是如何正确解码异步StreamReader.

I am getting used to asyncio and find the task handling quite nice, but it can be difficult to mix async libraries with traditional io libraries. The problem I am currently facing is how to properly decode an async StreamReader.

最简单的解决方案是read()个字节字符串块,然后解码每个块-请参见下面的代码. (在我的程序中,我不会打印每个块,而是将其解码为字符串并将其发送到另一种处理方法中):

The simplest solution is to read() chunks of byte strings, and then decode each chunk - see code below. (In my program, I wouldn't print each chunk, but decode it into a string and send it into another method for processing):

import asyncio
import aiohttp

async def get_data(port):
    url = 'http://localhost:{}/'.format(port)
    r = await aiohttp.get(url)
    stream = r.content
    while not stream.at_eof():
        data = await stream.read(4)
        print(data.decode('utf-8'))

这很好,直到有一个utf-8字符被分割成太多的块为止.例如,如果响应为b'M\xc3\xa4dchen mit Bi\xc3\x9f\n',则读取3的块将起作用,但读取4的块将不起作用(因为\xc3\x9f处于不同的块中,对以\xc3结尾的块进行解码将引发以下内容错误:

This works fine, until there is a utf-8 character that is split between too chunks. For example if the response is b'M\xc3\xa4dchen mit Bi\xc3\x9f\n', then reading chunks of 3 will work, but chunks of 4 will not (as \xc3 and \x9f are in different chunks and decoding the chunk ending with \xc3 will raise the following error:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

我研究了解决此问题的适当方法,至少在阻塞世界中,它似乎是io.TextIOWrapper或codecs.StreamReaderWriter(其差异在

I looked at proper solutions to this problem, and at least in the blocking world, seems to be either io.TextIOWrapper or codecs.StreamReaderWriter (the differences of which are discussed in PEP 0400). However, both of these rely on typical blocking streams.

我花了30分钟用asyncio搜索示例,并一直在寻找我的define()解决方案.有谁知道更好的解决方案,还是这是python asyncio中缺少的功能?

I spent 30 minutes searching for examples with asyncio and kept finding my decode() solution. Does anyone know of a better solution or is this a missing feature in python's asyncio?

作为参考,这是将两个标准"解码器与异步流一起使用的结果.

For reference, here are the results from using the two "standard" decoders with async streams.

使用编解码器流读取器:

Using the codec stream reader:

r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)

例外:

File "echo_client.py", line 13, in get_data
  data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
  data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator

(它直接调用read(),而不是yield fromawait)

(it calls read() directly, rather than yield from or await it)

我还尝试使用io.TextIOWrapper包装流:

I also tried wrapping stream with io.TextIOWrapper:

stream = TextIOWrapper(r.content)

但这会导致以下结果:

File "echo_client.py", line 10, in get_data
  stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'

P.S.如果您想要一个示例测试用例,请查看此要点.您可以使用python3.5运行它来重现该错误.如果将块大小从4更改为3(或30),它将可以正常工作.

P.S. If you want a sample test case for this, please look at this gist. You can run it with python3.5 to reproduce the error. If you change the chunk size from 4 to 3 (or 30), it will work correctly.

编辑

已被接受的答案像一个咒语一样解决了这个问题.谢谢!如果其他人有此问题,这是我制作的一个简单包装器类,用于处理StreamReader上的解码:

The accepted answer fixed this like a charm. Thanks! If someone else has this issue, here is a simple wrapper class I made to handle the decoding on a StreamReader:

import codecs

class DecodingStreamReader:
    def __init__(self, stream, encoding='utf-8', errors='strict'):
        self.stream = stream
        self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)

    async def read(self, n=-1):
        data = await self.stream.read(n)
        if isinstance(data, (bytes, bytearray)):
            data = self.decoder.decode(data)
        return data

    def at_eof(self):
        return self.stream.at_eof() 

推荐答案

您可以使用增量解码器:

Utf8Decoder = codecs.getincrementaldecoder('utf-8')

以您的示例为例:

decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
    data = await stream.read(4)
    print(decoder.decode(data), end='')

输出:

Mädchen mit Biß

这篇关于使用StreamReader异步解码utf-8的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆