Pandas 发出多个 HTTP 请求 [英] Pandas Making multiple HTTP requests

查看:72
本文介绍了Pandas 发出多个 HTTP 请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码可以从 csv 文件中读取一些股票代码到数据框中.
每个股票代码调用 Web Api 返回一个 dafaframe df,然后附加到最后一个直到完成.
代码有效,但是当使用大量代码时,代码会大大减慢.
我知道我可以使用多处理和线程来加速我的代码,但不知道从哪里开始以及什么最适合我的特定情况.

我应该使用什么代码以最快的方式将数据放入组合数据框?

将pandas导入为pd将 numpy 导入为 np导入jsontickers=pd.read_csv("D:/verhuizen/pensioen/MULTI.csv",names=['symbol','company'])read_str='https://financialmodelingprep.com/api/v3/income-statement/AAPL?limit=120&apikey=demo'df = pd.read_json (read_str)df = pd.DataFrame(columns=df.columns)对于范围内的 ind(len(tickers)):read_str='https://financialmodelingprep.com/api/v3/income-statement/'+tickers['symbol'][ind] +'?limit=120&apikey=demo'df1 = pd.read_json (read_str)df=pd.concat([df,df1], ignore_index=True)df.set_index(['date','symbol'], inplace=True)df.sort_index(就地=真)df.to_csv('D:/verhuizen/pensioen/MULTI_out.csv')


提供的代码适用于较小的数据集,但是当我在某个时候使用大量代码 (>4,000) 时,我收到以下错误.这是因为 web api 超载还是有其他问题?

回溯(最近一次调用最后一次):文件D:/Verhuizen/Pensioen/Equity_Extractor_2021.py",第 43 行,在 <module> 中.数据 = pool.starmap(download_data, enumerate(TICKERS, start=1))文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 276 行,在星图中返回 self._map_async(func, iterable, starmapstar, chunksize).get()文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 657 行,在 get提高 self._valuemultiprocessing.pool.MaybeEncodingError:发送结果时出错:''.原因:'TypeError("cannot serialize '_io.BufferedReader' object")'进程以退出代码 1 结束


它不断给出相同的错误(对于大量的股票代码)代码与提供的完全一样:

def download_data(pool_id, symbols):df = []对于符号中的符号:打印([{:02}]:{}".格式(pool_id,符号))#在这里做事read_str = BASEURL.format(符号)df.append(pd.read_json(read_str))#df.append(pd.read_json(fake_data(symbol)))返回 pd.concat(df, ignore_index=True)


使用 pool.map 再次失败,但我注意到一件奇怪的事情.每次失败时,它都会执行大约 12,500 个代码(总共大约 23,000 个代码)类似的错误:

回溯(最近一次调用最后一次):文件C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_naive.py",第 21 行,在  中数据 = pool.map(download_data, TICKERS)文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 268 行,在地图中返回 self._map_async(func, iterable, mapstar, chunksize).get()文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 657 行,在 get提高 self._valuemultiprocessing.pool.MaybeEncodingError:错误发送结果:''.原因:'TypeError("cannot serialize '_io.BufferedReader' object")'进程以退出代码 1 结束

我也从 API 调用中获取代码 https://financialmodelingprep.com/api/v3/financial-statement-symbol-lists?apikey=demo(我注意到它在没有订阅的情况下不起作用),我想将数据附加为一个csv文件,但我没有拥有足够的权利.我认为将返回的数据粘贴在这里不是一个好主意...


我尝试按照建议在返回之前添加 time.sleep(0.2),但在股票代码 12,510 处再次出现相同的错误.每次都在同一个地方,很奇怪.由于有多个进程在进行,我看不到它在什么时候中断

回溯(最近一次调用最后一次):文件C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_naive.py",第 24 行,在  中数据 = pool.map(download_data, TICKERS)文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 268 行,在地图中返回 self._map_async(func, iterable, mapstar, chunksize).get()文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 657 行,在 get提高 self._valuemultiprocessing.pool.MaybeEncodingError:发送结果时出错:''.原因:'TypeError("cannot serialize '_io.BufferedReader' object")'进程以退出代码 1 结束

发生了一些非常非常奇怪的事情,我将数据分成 10,000/5,000/4,000 和 2,000 的块,每次代码从末尾打破大约 100 个行情.显然有什么不对的地方

导入时间将熊猫导入为 pd导入多处理# 从你的 csv 中获取代码df=pd.read_csv('D:/Verhuizen/Pensioen/All_Symbols.csv',header=None)# 将 Dataframe 设置为列表(总共 23,000 个股票代码)df=df[0]TICKERS=df.tolist()#选择我想要的股票数量代号=代号[0:2000]BASEURL = https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"定义下载数据(符号):打印(符号)# 在这里做事read_str = BASEURL.format(符号)df = pd.read_json(read_str)#time.sleep(0.2)返回 df如果 __name__ == __main__":使用 multiprocessing.Pool(multiprocessing.cpu_count()) 作为池:数据 = pool.map(download_data, TICKERS)df = pd.concat(data).set_index([日期",符号"]).sort_index()df.to_csv('D:/verhuizen/pensioen/Income_2000.csv')

在此特定示例中,代码在位置 1,903 处中断

<预><代码>RPAI回溯(最近一次调用最后一次):文件C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_testing.py",第 27 行,在 <module> 中.数据 = pool.map(download_data, TICKERS)文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 268 行,在地图中返回 self._map_async(func, iterable, mapstar, chunksize).get()文件C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py",第 657 行,在 get提高 self._valuemultiprocessing.pool.MaybeEncodingError:发送结果时出错:''.原因:'TypeError("cannot serialize '_io.BufferedReader' object")'

解决方案

第一个优化是避免在每次迭代时连接数据框.
你可以试试这样的:

url = https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"df = []对于股票代码中的符号[符号"]:read_str = url.format(符号)df.append(pd.read_json(read_str))df = pd.concat(df, ignore_index=True)

如果还不够,我们会看到使用 asyncthreadingmultiprocessing.

编辑:
下面的代码可以完成这项工作:

将pandas导入为pd将 numpy 导入为 np导入多处理导入时间随机导入PROCESSES = 4 # 并行进程数CHUNKS = 6 #一个进程处理n个符号# 从你的 csv 中获取代码TICKERS = [BCDA"、WBAI"、NM"、ZKIN"、TNXP"、FLY"、MYSZ"、GASX"、SAVA"、"GCE",XNET"、SRAX"、SINO"、LPCN"、XYF"、SNSS"、DRAD"、WLFC"、OILD"、JFIN"、TAOP"、PIC"、DIVC"、MKGI"、CCNC"、AEI"、ZCMD"、YVR"、OCG"、IMTE"、AZRX"、LIZI"、ORSN"、ASPU"、SHLL"、INOD"、NEXI"、INR"、SLN"、RHE-PA";,MAX"、ARRY"、BDGE"、TOTA"、PFMT"、AMRH"、IDN"、OIS"、RMG"、IMV"、CHFS"、SUMR"、NRG"、ULBR"、SJI"、HOML"、AMJL"、RUBY"、KBLMU"、ELP"]# 创建一个包含 n 个子列表的列表TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]BASEURL = https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"def fake_data(符号):dti = pd.date_range(1985",2020", freq=Y")df = pd.DataFrame({日期":dti,符号":符号,A":np.random.randint(0, 100, size=len(dti)),B":np.random.randint(0, 100, size=len(dti))})time.sleep(random.random()) # 模拟网络延迟返回 df.to_json()定义下载数据(pool_id,符号):df = []对于符号中的符号:打印([{:02}]:{}".格式(pool_id,符号))# 在这里做事# read_str = BASEURL.format(symbol)# df.append(pd.read_json(read_str))df.append(pd.read_json(fake_data(symbol)))返回 pd.concat(df, ignore_index=True)如果 __name__ == __main__":使用 multiprocessing.Pool(PROCESSES) 作为池:数据 = pool.starmap(download_data, enumerate(TICKERS, start=1))df = pd.concat(data).set_index([日期",符号"]).sort_index()

在此示例中,我将股票代码列表拆分为子列表,以便每个进程检索多个交易品种的数据并限制由于创建和销毁进程而产生的开销.

延迟是为了模拟网络连接的响应时间并突出多进程行为.

Edit 2:满足您需求的更简单但简单的版本

将pandas导入为pd导入多处理# 从你的 csv 中获取代码TICKERS = [BCDA"、WBAI"、NM"、ZKIN"、TNXP"、FLY"、MYSZ"、GASX"、SAVA"、"GCE",XNET"、SRAX"、SINO"、LPCN"、XYF"、SNSS"、DRAD"、WLFC"、OILD"、JFIN"、TAOP"、PIC"、DIVC"、MKGI"、CCNC"、AEI"、ZCMD"、YVR"、OCG"、IMTE"、AZRX"、LIZI"、ORSN"、ASPU"、SHLL"、INOD"、NEXI"、INR"、SLN"、RHE-PA";,MAX"、ARRY"、BDGE"、TOTA"、PFMT"、AMRH"、IDN"、OIS"、RMG"、IMV"、CHFS"、SUMR"、NRG"、ULBR"、SJI"、HOML"、AMJL"、RUBY"、KBLMU"、ELP"]BASEURL = https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"定义下载数据(符号):打印(符号)# 在这里做事read_str = BASEURL.format(符号)df = pd.read_json(read_str)返回 df如果 __name__ == __main__":使用 multiprocessing.Pool(multiprocessing.cpu_count()) 作为池:数据 = pool.map(download_data, TICKERS)df = pd.concat(data).set_index([日期",符号"]).sort_index()

关于pool.map的注意事项:对于TICKERS中的每个符号,创建一个进程并调用函数download_data.

I have below code that reads from a csv file a number of ticker symbols into a dataframe.
Each ticker calls the Web Api returning a dafaframe df which is then attached to the last one until complete.
The code works , but when a large number of tickers is used the code slows down tremendously.
I understand I can use multiprocessing and threads to speed up my code but dont know where to start and what would be the most suited in my particular case.

What code should I use to get my data into a combined daframe in the fastest possible manner?

import pandas as pd
import numpy as np
import json

tickers=pd.read_csv("D:/verhuizen/pensioen/MULTI.csv",names=['symbol','company'])
read_str='https://financialmodelingprep.com/api/v3/income-statement/AAPL?limit=120&apikey=demo'
df = pd.read_json (read_str)
df = pd.DataFrame(columns=df.columns)

for ind in range(len(tickers)):
    read_str='https://financialmodelingprep.com/api/v3/income-statement/'+ tickers['symbol'][ind] +'?limit=120&apikey=demo'
    df1 = pd.read_json (read_str)       
    df=pd.concat([df,df1], ignore_index=True)
  
df.set_index(['date','symbol'], inplace=True)
df.sort_index(inplace=True)

df.to_csv('D:/verhuizen/pensioen/MULTI_out.csv')


The code provided works fine for smaller data sets, but when I use a large number of tickers (>4,000) at some point I get the below error. Is this because the web api gets overloaded or is there another problem?

Traceback (most recent call last):
  File "D:/Verhuizen/Pensioen/Equity_Extractor_2021.py", line 43, in <module>
    data = pool.starmap(download_data, enumerate(TICKERS, start=1))
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 276, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00C33E30>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'

Process finished with exit code 1


It keeps giving the same error (for a larger amount of tickers) code is exactly as provided:

def download_data(pool_id, symbols):
    df = []
    for symbol in symbols:
        print("[{:02}]: {}".format(pool_id, symbol))
        #do stuff here
        read_str = BASEURL.format(symbol)
        df.append(pd.read_json(read_str))
        #df.append(pd.read_json(fake_data(symbol)))
    return pd.concat(df, ignore_index=True)


It failed again with the pool.map, but one strange thing I noticed. Each time it fails it does so around 12,500 tickers (total is around 23,000 tickers) Similar error:

Traceback (most recent call last):
  File "C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_naive.py", line 21, in <module>
    data = pool.map(download_data, TICKERS)
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x078D1BF0>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'

Process finished with exit code 1


I get the tickers also from a API call https://financialmodelingprep.com/api/v3/financial-statement-symbol-lists?apikey=demo (I noticed it does not work without subscription), I wanted to attach the data it as a csv file but I dont have sufficient rights. I dont think its a good idea to paste the returned data here...


I tried adding time.sleep(0.2) before return as suggested, but again I ge the same error at ticker 12,510. Strange everytime its around the same location. As there are multiple processes going on I cannot see at what point its breaking

Traceback (most recent call last):
  File "C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_naive.py", line 24, in <module>
    data = pool.map(download_data, TICKERS)
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00F32C90>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'

Process finished with exit code 1


Something very very strange is going on , I have split the data in chunks of 10,000 / 5,000 / 4,000 and 2,000 and each time the code breaks approx 100 tickers from the end. Clearly there is something going on that not right

import time
import pandas as pd
import multiprocessing

# get tickers from your csv
df=pd.read_csv('D:/Verhuizen/Pensioen/All_Symbols.csv',header=None)

# setting the Dataframe to a list (in total 23,000 tickers)
df=df[0]
TICKERS=df.tolist()

#Select how many tickers I want
TICKERS=TICKERS[0:2000]

BASEURL = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"

def download_data(symbol):
    print(symbol)
    # do stuff here
    read_str = BASEURL.format(symbol)
    df = pd.read_json(read_str)
    #time.sleep(0.2)
    return df

if __name__ == "__main__":
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.map(download_data, TICKERS)
        df = pd.concat(data).set_index(["date", "symbol"]).sort_index()
    df.to_csv('D:/verhuizen/pensioen/Income_2000.csv')

In this particular example the code breaks at position 1,903


RPAI
Traceback (most recent call last):
  File "C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_testing.py", line 27, in <module>
    data = pool.map(download_data, TICKERS)
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\MLUY\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x0793EAF0>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'

解决方案

First optimization is to avoid concatenate your dataframe at each iteration.
You can try something like that:

url = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"
df = []

for symbol in tickers["symbol"]:
    read_str = url.format(symbol)
    df.append(pd.read_json(read_str))

df = pd.concat(df, ignore_index=True)

If it's not sufficient, we will see to use async, threading or multiprocessing.

Edit:
The code below can do the job:

import pandas as pd
import numpy as np
import multiprocessing
import time
import random

PROCESSES = 4  # number of parallel process
CHUNKS = 6  # one process handle n symbols

# get tickers from your csv
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
           "XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
           "TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
           "AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
           "MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
           "CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]

# create a list of n sublist
TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]

BASEURL = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"


def fake_data(symbol):
    dti = pd.date_range("1985", "2020", freq="Y")
    df =  pd.DataFrame({"date": dti, "symbol": symbol,
                        "A": np.random.randint(0, 100, size=len(dti)),
                        "B": np.random.randint(0, 100, size=len(dti))})
    time.sleep(random.random())  # to simulate network delay
    return df.to_json()


def download_data(pool_id, symbols):
    df = []
    for symbol in symbols:
        print("[{:02}]: {}".format(pool_id, symbol))
        # do stuff here
        # read_str = BASEURL.format(symbol)
        # df.append(pd.read_json(read_str))
        df.append(pd.read_json(fake_data(symbol)))
    return pd.concat(df, ignore_index=True)


if __name__ == "__main__":
    with multiprocessing.Pool(PROCESSES) as pool:
        data = pool.starmap(download_data, enumerate(TICKERS, start=1))
        df = pd.concat(data).set_index(["date", "symbol"]).sort_index()

In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.

The delay is to simulate the response time from the network connection and highlight the multiprocess behaviour.

Edit 2: simpler but naive version for your needs

import pandas as pd
import multiprocessing

# get tickers from your csv
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
           "XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
           "TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
           "AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
           "MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
           "CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]

BASEURL = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120&apikey=demo"


def download_data(symbol):
    print(symbol)
    # do stuff here
    read_str = BASEURL.format(symbol)
    df = pd.read_json(read_str)
    return df


if __name__ == "__main__":
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.map(download_data, TICKERS)
        df = pd.concat(data).set_index(["date", "symbol"]).sort_index()

Note about pool.map: for each symbol in TICKERS, create a process and call function download_data.

这篇关于Pandas 发出多个 HTTP 请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆