在压缩的,分块的HTTP流中有效读取行 [英] Efficiently reading lines from compressed, chunked HTTP stream as they arrive

查看:88
本文介绍了在压缩的,分块的HTTP流中有效读取行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个HTTP服务器,该服务器生成由JSON结构的事件组成的无穷HTTP流.类似于Twitter的流API.这些事件由\n分隔(根据具有Content-Type的服务器发送的事件 :text/event-stream),并且长度可以变化.

I've written a HTTP-Server that produces endless HTTP streams consisting of JSON-structured events. Similar to Twitter's streaming API. These events are separated by \n (according to Server-sent events with Content-Type:text/event-stream) and can vary in length.

响应是

  • 由于无休止的流而分块(HTTP 1.1 Transfer-Encoding:chunked)
  • 压缩(内容编码:gzip)以节省带宽.

我想在Python中尽快使用这些行,并尽可能节省资源,而又不会浪费时间.

I want to consume these lines in Python as soon as they arrive and as resource-efficient as possible, without reinventing the wheel.

由于我当前正在使用python-requests,您知道如何使其工作吗? 如果您认为python-requests在这里无济于事,那么我对其他框架/库完全开放.

As I'm currently using python-requests, do you know how to make it work? If you think, python-requests cannot help here, I'm totally open for alternative frameworks/libraries.

我当前的实现基于请求,并使用iter_lines(...)接收行.但是chunk_size参数是棘手的.如果设置为1,则它将具有非常强的cpu强度,因为某些事件可能会达到几千字节.如果将其设置为大于1的任何值,则某些事件将被阻塞,直到下一个事件到达为止,并且整个缓冲区被填充".事件之间的时间可能是几秒钟. 我希望chunk_size是某种最大可接收字节数",就像在UNIX的recv(...)中一样.相应的手册页显示:

My current implementation is based on requests and uses iter_lines(...) to receive the lines. But the chunk_size parameter is tricky. If set to 1 it is very cpu-intense, since some events can be several kilobytes. If set to any value above 1, some events got stuck until the next arrive and the whole buffer "got filled". And the time between events can be several seconds. I expected that the chunk_size is some sort of "maximum number of bytes to receive" as in unix's recv(...). The corresponding man-page says:

接收呼叫通常会返回所有可用数据,直到 要求的金额,而不是等待全额收款 请求.

The receive calls normally return any data available, up to the requested amount, rather than waiting for receipt of the full amount requested.

但是,这显然不是在请求库中的工作方式.他们或多或少地将其用作要接收的确切字节数". 在查看其源代码时,我无法确定是哪部分负责.也许是httplib的Response或ssl的SSLSocket.

But this is obviously not how it works in the requests-library. They use it more or less as an "exact number of bytes to receive". While looking at their source code, I couldn't identify which part is responsible for that. Maybe httplib's Response or ssl's SSLSocket.

作为一种解决方法,我尝试将服务器上的行填充到块大小的倍数. 但是,请求库中的块大小用于从压缩的响应流中获取字节. 因此,这只有在我可以填充行以使它们的压缩字节序列是块大小的倍数时才起作用.但这似乎太不客气了.

As a workaround I tried padding my lines on the server to a multiple of the chunk-size. But the chunk-size in the requests-library is used to fetch bytes from the compressed response stream. So this won't work until I can pad my lines so that their compressed byte-sequence is a multiple of the chunk-size. But this seems far too hacky.

我已经读到Twisted可以用于客户端上HTTP流的非阻塞,非缓冲处理,但是我只找到了用于在服务器上创建流响应的代码.

I've read that Twisted could be used for non-blocking, non-buffered processing of http streams on the client, but I only found code for creating stream responses on the server.

推荐答案

感谢 Martijn Pieters回答我停止了工作python请求行为,并寻求一种完全不同的方法.

Thanks to Martijn Pieters answer I stopped working around python-requests behavior and looked for a completely different approach.

我最终使用了 pyCurl .您可以像select + recv循环一样使用它,而无需像Tornado等那样反转控制流并放弃对专用IO循环的控制.通过这种方式,可以轻松使用生成器,当它们到达时立即产生新行-无需在中间层中进一步缓冲,否则可能会导致延迟或运行IO循环的其他线程.

I ended up using pyCurl. You can use it similar to a select+recv loop without inverting the control flow and giving up control to a dedicated IO-loop as in Tornado, etc. This way it is easy to use a generator that yields new lines as soon as they arrive - without further buffering in intermediate layers that could introduce delay or additional threads that run the IO-loop.

同时,它足够高级,您无需担心分块传输编码,SSL加密或gzip压缩.

At the same time, it is high-level enough, that you don't need to bother about chunked transfer encoding, SSL encryption or gzip compression.

这是我的旧代码,其中chunk_size = 1导致45%的CPU负载,而> 1引入了额外的滞后.

This was my old code, where chunk_size=1 resulted in 45% CPU load and chunk_size>1 introduced additional lag.

import requests
class RequestsHTTPStream(object):
    def __init__(self, url):
        self.url = url

    def iter_lines(self):
        headers = {'Cache-Control':'no-cache',
                   'Accept': 'text/event-stream',
                   'Accept-Encoding': 'gzip'}
        response = requests.get(self.url, stream=True, headers=headers)
        return response.iter_lines(chunk_size=1)

这是我基于pyCurl的新代码: (不幸的是,curl_easy_ *样式perform完全阻塞了,这使得不使用线程就很难在它们之间产生线.因此,我正在使用curl_multi_ *方法)

Here is my new code based on pyCurl: (Unfortunately the curl_easy_* style perform blocks completely, which makes it difficult to yield lines in between without using threads. Thus I'm using the curl_multi_* methods)

import pycurl
import urllib2
import httplib
import StringIO

class CurlHTTPStream(object):
    def __init__(self, url):
        self.url = url
        self.received_buffer = StringIO.StringIO()

        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
        self.curl.setopt(pycurl.ENCODING, 'gzip')
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)

        self.curlmulti = pycurl.CurlMulti()
        self.curlmulti.add_handle(self.curl)

        self.status_code = 0

    SELECT_TIMEOUT = 10

    def _any_data_received(self):
        return self.received_buffer.tell() != 0

    def _get_received_data(self):
        result = self.received_buffer.getvalue()
        self.received_buffer.truncate(0)
        self.received_buffer.seek(0)
        return result

    def _check_status_code(self):
        if self.status_code == 0:
            self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
        if self.status_code != 0 and self.status_code != httplib.OK:
            raise urllib2.HTTPError(self.url, self.status_code, None, None, None)

    def _perform_on_curl(self):
        while True:
            ret, num_handles = self.curlmulti.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break
        return num_handles

    def _iter_chunks(self):
        while True:
            remaining = self._perform_on_curl()
            if self._any_data_received():
                self._check_status_code()
                yield self._get_received_data()
            if remaining == 0:
                break
            self.curlmulti.select(self.SELECT_TIMEOUT)

        self._check_status_code()
        self._check_curl_errors()

    def _check_curl_errors(self):
        for f in self.curlmulti.info_read()[2]:
            raise pycurl.error(*f[1:])

    def iter_lines(self):
        chunks = self._iter_chunks()
        return self._split_lines_from_chunks(chunks)

    @staticmethod
    def _split_lines_from_chunks(chunks):
        #same behaviour as requests' Response.iter_lines(...)

        pending = None
        for chunk in chunks:

            if pending is not None:
                chunk = pending + chunk
            lines = chunk.splitlines()

            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = None

            for line in lines:
                yield line

        if pending is not None:
            yield pending

此代码尝试从传入流中获取尽可能多的字节,如果只有很少的字节,则不会不必要地进行阻塞.相比之下,CPU负载约为0.2%

This code tries to fetch as many bytes as possible from the incoming stream, without blocking unnecessarily if there are only a few. In comparison, the CPU load is around 0.2%

这篇关于在压缩的,分块的HTTP流中有效读取行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆