如何用字典修复多线程/多处理? [英] How to fix multithreading/multiprocessing with dictionaries?
问题描述
我正在使用一个函数进行超过100K的调用,使用2个函数,我用第一个函数联系到api,并为每个主机获取sysinfo(一个字典),然后使用第二个函数,我通过sysinfo并获取IP地址.我正在寻找一种加快速度的方法,但之前从未使用过多处理/线程处理(目前大约需要3个小时).
I'm making over 100K calls to an api, using 2 functions I reach out to the api with the first function and grab the sysinfo(a dict) for each host, then with the second function I go through sysinfo and grab the IP addresses. I'm looking for a way to speed this up but never used multiprocessing/threading before(currently takes about 3 hours).
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
#pool = ThreadPool(4)
p = Pool(5)
#obviously I removed a lot of the code that generates some of these
#variables, but this is the part that slooooows everything down.
def get_sys_info(self, host_id, appliance):
sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
return sysinfo
def get_ips_from_sysinfo(self, sysinfo):
sysinfo = sysinfo["data"]
network_array = sysinfo.get("networkArray", {})
network_info = network_array.get("networkInfo", [])
ips = []
for ni in network_info:
ip_array = ni.get("ipArray", {})
ip_info = ip_array.get("ipInfo", [])
for i in ip_info:
ips.append(i)
return ips
if __name__ == "__main__":
for i in ids:
sysinfo = rr.get_sys_info(i, appliance)
hostname = sysinfo.get("data", {}).get("hostname")
try:
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
except Exception as e:
rr.logger.error("Exception on {} -- {}".format(hostname, e))
continue
#Tried calling it here
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
我必须经历超过100,000个这些api调用,而这确实是使一切变慢的部分.
I have to go through over 100,000 of these api calls, and this is really the part that slows everything down.
我想我已经尝试了一切,并获得了所有可能的可迭代的,缺少参数的错误.
I think I've tried everything and gotten every possible iterable, missing argument error.
我真的很感谢任何类型的帮助.谢谢!
I'd just really appreciate any type of help. Thank you!
推荐答案
您可以使用线程和队列进行通信,首先您将启动get_ips_from_sysinfo
作为单个线程来监视和处理所有将存储输出的已完成的sysinfo
在output_list
中,然后触发所有get_sys_info
线程,请注意不要耗尽100k线程的内存
you can use threads and queue to communicate, first you will start get_ips_from_sysinfo
as a single thread to monitor and process any finished sysinfo
which will store output in output_list
then fire all get_sys_info
threads, be careful not to run out of memory with 100k threads
from threading import Thread
from queue import Queue
jobs = Queue() # buffer for sysinfo
output_list = [] # store ips
def get_sys_info(self, host_id, appliance):
sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
jobs.put(sysinfo) # add sysinfo to jobs queue
return sysinfo # comment if you don't need it
def get_ips_from_sysinfo(self):
"""it will run contineously untill finish all jobd"""
while True:
# get sysinfo from jobs queue
sysinfo = jobs.get() # it will wait here for new entry
if sysinfo == 'exit':
print('we are done here')
break
sysinfo = sysinfo["data"]
network_array = sysinfo.get("networkArray", {})
network_info = network_array.get("networkInfo", [])
ips = []
for ni in network_info:
ip_array = ni.get("ipArray", {})
ip_info = ip_array.get("ipInfo", [])
for i in ip_info:
ips.append(i)
output_list.append(ips)
if __name__ == "__main__":
# start our listner thread
Thread(target=rr.get_ips_from_sysinfo)
threads = []
for i in ids:
t = Thread(target=rr.get_sys_info, args=(i, appliance))
threads.append(t)
t.start()
# wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flag
for t in threads:
t.join()
jobs.put('exit')
这篇关于如何用字典修复多线程/多处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!