读取Http流 [英] Read Http Stream

查看:155
本文介绍了读取Http流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从流式API中读取数据,该数据使用分块传输编码发送。每个块可以有多个记录,每个记录都由CRLF分隔。并且数据始终使用gzip压缩发送。我正在尝试获取提要,然后一次进行一些处理。我已经看过很多stackOverflow资源,但是找不到用Python做到这一点的方法。

I am trying to read from a streaming API where the data is sent using Chunked Transfer Encoding. There can be more than one record per chunk, each record is separated by a CRLF. And the data is always sent using gzip compression. I am trying to get the feed and then do some processing at a time. I have gone through a bunch of stackOverflow resources but couldn't find a way to do it in Python. the iter_content(chunk) size in my case is throwing an exception on the line.

for chunk in api_response.iter_content(chunk_size=1024): 

在Fiddler(我用作代理)中,我看到数据正在常量下载并在Fiddler中执行 COMETPeek,我实际上可以看到一些示例json。

In Fiddler (which I am using as a Proxy) I can see that data is being constant downloaded and doing a "COMETPeek" in Fiddler, I can actually see some sample json.

即使iter_lines不起作用。我看过这里提到的asyncio和aiohttp案例:为什么request.get()不返回?

Even iter_lines does not work. I have looked at asyncio and aiohttp case mentioned here: Why doesn't requests.get() return? What is the default timeout that requests.get() uses?

但不知道如何进行处理。如您所见,我已经尝试使用一堆python库。抱歉,某些代码可能包含一些我后来无法使用的库,因为它们无法正常工作。

but not sure how to do the processing. As you can see I have tried using bunch of python libraries. Sorry some of the code might have some libraries that I later removed from usage as it didn't work out.

我也查看了请求库的文档,但无法

I have also looked at the documentation for requests library but couldn't find anything substantial.

如上所述,下面是我正在尝试做的示例代码。

As mentioned above, below is a sample code of what I am trying to do. Any pointers to how I should proceed would be highly appreciated.

这是我第一次尝试阅读信息流

This is the first time I am trying to read a stream

from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
import requests
import zlib
import json

READ_BLOCK_SIZE = 1024*8

clientID="ClientID"
clientSecret="ClientSecret"

proxies = {
"https": "http://127.0.0.1:8888",
}

client = BackendApplicationClient(client_id=clientID)
oauth = OAuth2Session(client=client)

token = oauth.fetch_token(token_url='https://baseTokenURL/token', client_id=clientID,client_secret=clientSecret,proxies=proxies,verify=False) 

auth_t=token['access_token']
#auth_t = accesstoken.encode("ascii", "ignore")

headers = {
'authorization': "Bearer " + auth_t,
'content-type': "application/json",
'Accept-Encoding': "gzip",
}
dec=zlib.decompressobj(32 + zlib.MAX_WBITS)

try:
    init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
    if init_res.status_code == 302:
        print(init_res.headers['Location'])
        api_response = requests.get(init_res.headers['Location'], headers=headers, allow_redirects=False,proxies=proxies,verify=False, timeout=20, stream=True,params={"smoothing":"1", "smoothingBucketSize" : "180"})
        if  api_response.status_code == 200:
            #api_response.raw.decode_content = True

            #print(api_response.raw.read(20))
            for chunk in api_response.iter_content(chunk_size=api_response.chunk_size): 
                #Parse the response
    elif init_res.status_code == 200:
        print(init_res.content)
except Exception as ce:
    print(ce)

UPDATE
我现在正在查看: https:// aiohttp .readthedocs.io / zh-CN / v0.20.0 / client.html

这将是一种方法吗?

推荐答案

以防万一有人觉得有用。我找到了一种使用aiohttp从api通过python流式传输的方法。下面是骨骼。请记住,这只是一个骨架,它通过不断向我展示结果而起作用。如果有人有更好的方法-我全神贯注,因为这是我第一次尝试顺其自然。

Just in case someone finds this useful. I have found a way to stream from the api through python using aiohttp. Below is the skeleton. Remember it is just a skeleton and it works by continuously showing me results. If someone has a better way of doing it- I am all ears and eyes, since this is the first time I am trying to catch a stream.

async def fetch(session, url, headers):
    with async_timeout.timeout(None):
        async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
            while True:
                chunk=await r.content.read(1024*3)
                if not chunk:
                    break                    
                print(chunk)

async def main(url, headers):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, url,headers)

在呼叫者中

try:
    init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
    if init_res.status_code == 302:
        loc=init_res.headers['Location']
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loc, headers=headers))
    elif init_res.status_code == 200:
        print(init_res.content)
except Exception as ce:
    print(ce)

这篇关于读取Http流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆