使用多线程模块将 API 数据检索到数据帧中 [英] Retrieve API data into dataframe using multi threading module

查看:46
本文介绍了使用多线程模块将 API 数据检索到数据帧中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用第三方 API 为不同的标签从大量天数中检索 10 分钟的数据.当前数据拉取可能需要长达几分钟的时间,具体取决于天数和标签数量.因此,我正在尝试使用多线程,我认为这对于繁重的 IO 操作很有用.

I'm using a third-party API to retrieve 10 minute data from a large number of days for different tags. The current data pull can take up to several minutes depending of course of the number of days and number of tags. I'm therefore trying my hand at multi threading which I understand can be useful for heavy IO operations.

API 调用如下(我替换了实际的 API 名称):

The API call goes as follows (I've replaced the actual API name):

import numpy as N 
import requests as r 
import json 
import pandas as pd
from datetime import datetime 
import concurrent.futures

  
class pyGeneric: 
  
    def __init__(self, serverName, apiKey, rootApiUrl='/Generic.Services/api'): 
        """ 
        Initialize a connection to server, and return a pyGeneric server object 
        """ 
        self.baseUrl = serverName + rootApiUrl 
        self.apiKey = apiKey 
        self.bearer = 'Bearer ' + apiKey 
        self.header = {'mediaType':'application/json','Authorization':self.bearer} 
  
    def getRawMeasurementsJson(self, tag, start, end):
        apiQuery = '/measurements/' + tag + '/from/' + start + '/to/' + end + '?format=json' 
        dataresponse = r.get(self.baseUrl+apiQuery, headers=self.header) 
        data = json.loads(dataresponse.text) 
        return data 
                                                               
                                
    def getAggregatesPandas(self, tags, start, end):
        """        
        Return tag(s) in a pandas dataFrame
        """
        df = pd.DataFrame()
        if type(tags) == str:
            tags = [tags]
        for tag in tags:
            tempJson =  self.getRawMeasurementsJson(tag, start, end)
            tempDf = pd.DataFrame(tempJson['timeSeriesList'][0]['timeSeries'])
            name = tempJson['timeSeriesList'][0]['measurementName']
            df['TimeUtc'] = [datetime.fromtimestamp(i/1000) for i in tempDf['t']]
            df['TimeUtc'] = df['TimeUtc'].dt.round('min')
            df[name] = tempDf['v']
        return df
    

gener = pyGeneric('https://api.generic.com', 'auth_keymlkj9789878686')

对 API 的调用示例如下:gener_df =gener.getAggregatesPandas('tag1.10m.SQL', '*-10d', '*')

An example call to the API would be : gener_df = gener.getAggregatesPandas('tag1.10m.SQL', '*-10d', '*')

这适用于单个标签,但对于列表,这需要更长的时间,这就是我一直在尝试以下方法的原因:

This works OK for individual tags but for a list this takes longer which is why I've been trying the following:

tags = ['tag1.10m.SQL',
'tag2.10m.SQL',
'tag3.10m.SQL',
'tag4.10m.SQL',
'tag5.10m.SQL',
'tag6.10m.SQL',
'tag7.10m.SQL',
'tag8.10m.SQL',
'tag9.10m.SQL',
'tag10.10m.SQL']

startdate = "*-150d"
enddate = '*'

final_df = pd.DataFrame

with concurrent.futures.ThreadPoolExecutor() as executor:
    args = ((i,startdate, enddate) for i in tags)
    executor.map(lambda p: gener.getAggregatesPandas(*p), args)

但是我无法检查是否正确执行了gener.getAggregatesPandas.最终,我想在名为 final_df 的数据框中获得结果,但也不确定如何进行.我读过这个post在上下文管理器中追加会导致数据帧的二次副本,因此最终会减慢速度.

However I'm unable to check if the gener.getAggregatesPandas is being properly executed. Ultimately I would like to get the results in a dataframe called final_df but also unsure of how to proceed. I've read in this post that append inside the context manager would lead to quadratic copies of the data frame so ultimately would slow things down.

推荐答案

据我所知,您需要了解 getAggregatesPandas 是否正确执行.

As I understand correctly your need is to understand if getAggregatesPandas executed properly.

你可以像下面那样做.

with concurrent.futures.ThreadPoolExecutor() as executor:
    args = ((i,startdate, enddate) for i in tags)
    results = executor.map(lambda p: gener.getAggregatesPandas(*p), args)
    for result in results:
        final_df.append(result,ignore_index=False)
    #another approach is below
    #for f in concurrent.futures.as_completed(results):
    #     final_df.append(result,ignore_index=False)

REF 视频:-视频

这篇关于使用多线程模块将 API 数据检索到数据帧中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆