在实现 asyncio websockets 作为一个类后接收流数据? [英] Receiving streaming data after implementing asyncio websockets as a class?

查看:18
本文介绍了在实现 asyncio websockets 作为一个类后接收流数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题与以下

请指教.

解决方案

问题出在函数上

第一个 loop.run_until_complete 运行直到未来完成 doc run_until_complete
这意味着您的函数接收将只运行一个响应.run_until_complete 不是 callback 函数!

所以在你的情况下 main:
deribit.get_ticks() -> 运行未来的实例 __async__get_ticks
所以 __async__get_ticks 是任务:让我们看看任务做了什么:
1.打开ws连接:
2.发送请求
3.等待ws
的响应4. 打印(响应)
这里任务完成为什么你只看到一行

 async def __async__get_ticks(self):与 self.ws 异步作为回显:等待 echo.send(json.dumps(self.request))响应 = 等待 echo.receive()打印(响应)

解释后:解决方案很简单:需要换行 response与同时

async def __async__get_ticks(self):与 self.ws 异步作为回显:等待 echo.send(json.dumps(self.request))为真:响应 = 等待 echo.receive()打印(响应)

输出

{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654476817,"price":7540.54,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654477824,"price":7540.52,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654478831,"price":7540.15,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654479838,"price":7539.83,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654480845,"price":7539.2,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654481852,"price":7538.96,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654482859,"price":7538.9,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654483866,"price":7538.89,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654484873,"price":7538.47,"index_name":"btc_usd"}}}{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654485880,"price":7537.15,"index_name":"btc_usd"}}}

My question is closely related to the following question on Stackoverflow and the documentation here. I am defining a websockets-connection as a class. Next, I create a new class where I call the earlier defined websocket-class as self.ws and tell which data to send to the websocket with self.request. My problem is that the current script only runs once, whereas my desired output is continuous data.

The second link shows that I can retrieve continuous / streaming data using

asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msg)))

I include all of the above code in my code (call_api is defined differently due to the desire to write it as a class). Below is my code:

import sys, json
import asyncio
from websockets import connect

class EchoWebsocket:
   def __init__(self, URL, CLIENT_ID=None, CLIENT_SECRET=None):
      self.url = URL
      self.client_id = CLIENT_ID
      self.client_secret = CLIENT_SECRET

   async def __aenter__(self):
      self._conn = connect(self.url)
      self.websocket = await self._conn.__aenter__()
      return self

   async def __aexit__(self, *args, **kwargs):
      await self._conn.__aexit__(*args, **kwargs)

   async def send(self, message):
      await self.websocket.send(message)

   async def receive(self):
      return await self.websocket.recv()

class DERIBIT:
   def __init__(self):
      self.ws = EchoWebsocket(URL='wss://test.deribit.com/ws/api/v2')
      self.loop = asyncio.get_event_loop()
      self.request = \
                   {"jsonrpc": "2.0",
                    "method": "public/subscribe",
                    "id": 42,
                    "params": {
                        "channels": ["deribit_price_index.btc_usd"]}
                   }

   def get_ticks(self):
      return self.loop.run_until_complete(self.__async__get_ticks())

   async def __async__get_ticks(self):
      async with self.ws as echo:
         await echo.send(json.dumps(self.request))
         response = await echo.receive()
         print(response)


if __name__ == "__main__":
   deribit = DERIBIT()
   deribit.get_ticks()

This script gives the following output:

{"jsonrpc": "2.0", "method": "public/subscribe", "id": 42, "params": {"channels": ["deribit_price_index.btc_usd"]}}

whereas I would like to see

Please advice.

解决方案

the problem is in the function

first loop.run_until_complete run until the future is complete doc run_until_complete
that mean your function receive will run only one response. run_until_complete is not a callback function!.

so in your case the main:
deribit.get_ticks() -> run the future instance __async__get_ticks
so __async__get_ticks is task: let's see what the task do:
1.open ws connection:
2.send request
3.wait the response of the ws
4. print(response)
here the task is done that why you see only one line

   async def __async__get_ticks(self):
      async with self.ws as echo:
         await echo.send(json.dumps(self.request))
         response = await echo.receive()
         print(response)

after explanation: the solution will be simple: need to wrap the line response with while

async def __async__get_ticks(self):
      async with self.ws as echo:
         await echo.send(json.dumps(self.request))
         while True:
                response = await echo.receive()
                print(response)

output

{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654476817,"price":7540.54,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654477824,"price":7540.52,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654478831,"price":7540.15,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654479838,"price":7539.83,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654480845,"price":7539.2,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654481852,"price":7538.96,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654482859,"price":7538.9,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654483866,"price":7538.89,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654484873,"price":7538.47,"index_name":"btc_usd"}}}
{"jsonrpc":"2.0","method":"subscription","params":{"channel":"deribit_price_index.btc_usd","data":{"timestamp":1587654485880,"price":7537.15,"index_name":"btc_usd"}}}

这篇关于在实现 asyncio websockets 作为一个类后接收流数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆