在子多进程中排队或锁定 [英] Queue or Lock in child multiprocess
问题描述
我访问该网站已有一段时间,我发现了很多有用的解决方案,可以解决我构建第一个python程序时遇到的问题.我希望你们能再次帮助我.
I've been on this site a while and I've found so many helpful solutions to the problems I've encountered as I build my first python program. I'm hopeful you guys can help me once again.
我正在尝试启动可变数量的多进程,每个进程都占用一小部分列表进行扫描.我一直在修改队列,但是当我实现它们时,它们总是会在我的循环中增加可观的时间.我正在寻求最大速度,同时保护Titles.txt免受错误内容的侵害.让我向您展示我的代码.
I am trying to launch a variable number of multiprocesses, with each one taking a small piece of a list to scan. I have been tinkering with queues, but when I implement them, they always add a sizable amount of time to my loop. I am looking to maximize my speed while protecting my Titles.txt from erroneous contents. Let me show you my code.
l= ['url1', 'url2', etc]
def output(t):
f = open('Titles.txt','a')
f.write(t)
f.close()
def job(y,processload):
calender = ['Jan', 'Feb', 'Mar', 'Dec'] #the things i want to find
for i in range(processload): #looping processload times
source = urllib.request.urlopen(l[y]).read() #read url #y
soup = bs.BeautifulSoup(source,'lxml')
for t in soup.html.head.find_all('title'):
if any(word in t for word in calender):
output(t) #this what i need to queue
y+=1 #advance url by 1
if __name__ == '__main__':
processload=5 #the number of urls to be scanned by job
y=0 #the specific count of url in list
runcount = 0
while runcount == 0: #engage loop
for i in range(380/processload): #the list size / 5
p= multiprocessing.Process(target=job, args=(y,processload)
p.start()
y+=processload #jump y ahead
上面的代码允许我的循环中达到最大速度.我想在保持速度的同时还保护我的输出.我一直在搜索示例,但是还没有找到在子进程中启动了锁定或队列功能的代码.您如何建议我继续?
The code above allows for maximum speed in my loop. I would like to preserve the speed while also protecting my output. I have been searching through examples, but I haven't found code yet that features a lock or queue started in a child process. How would you recommend I proceed?
非常感谢您.
推荐答案
此示例代码执行了我认为您希望程序执行的操作:
This example code does what I think you want a program to do:
import multiprocessing as mp
import time
import random
# Slicing a list into sublists from SilentGhost
# https://stackoverflow.com/a/2231685/4834
def get_chunks(input_list, chunk_size):
return [input_list[i:i+chunk_size] for i in range(0, len(input_list), chunk_size)]
def find_all(item):
''' Dummy generator to simulate fetching a page and returning interesting stuff '''
secs = random.randint(1,5)
time.sleep(secs)
# Just one yield here, but could yield each item found
yield item
def output(q):
''' Dummy sink which prints instead of writing to a file '''
while True:
item = q.get()
if item is None:
return
print(item)
def job(chunk, q):
for item in chunk:
for t in find_all(item):
q.put(t)
print('Job done:', chunk)
if __name__ == '__main__':
all_urls = ['url1', 'url2', 'url3', 'url4', 'url5', 'url6']
chunks = get_chunks(all_urls, 2)
q = mp.Queue()
# Create processes, each taking a chunk and the queue
processes = [mp.Process(target=job, args=(chunk,q)) for chunk in chunks]
# Start them all
for p in processes:
p.start()
# Create and start the sink
sink = mp.Process(target=output, args=(q,))
sink.start()
# Wait for all the jobs to finish
for p in processes:
p.join()
# Signal the end with None
q.put(None)
sink.join()
示例输出:
url3
Job done: ['url3', 'url4']
url4
url5
url1
Job done: ['url5', 'url6']
url6
Job done: ['url1', 'url2']
url2
这篇关于在子多进程中排队或锁定的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!