我将如何在实时场景中使用并发.期货和队列? [英] How would I go about using concurrent.futures and queues for a real-time scenario?
问题描述
使用Python 3的concurrent.futures
模块进行并行工作非常容易,如下所示.
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
for future in concurrent.futures.as_completed(future_to):
data = future.result()
在队列中插入和检索项目也非常方便.
q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
q.get()
我有一个脚本在后台运行,以监听更新.现在,从理论上讲,假设随着这些更新的到来,我将对它们进行排队,并使用ThreadPoolExecutor
同时对其进行处理.
现在,所有这些组件都单独工作并且有意义,但是如何一起使用它们呢?我不知道是否有可能实时从队列中提供ThreadPoolExecutor
工作,除非预先确定要从其工作的数据?
让我们以 Python中的规范示例为例下面的文档:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
URLS
的列表是固定的.是否可以实时提供此列表,并让工作人员从列表中进行处理(也许出于管理目的而从队列中进行处理)?我对我的方法是否实际上可行
两次获取每个 It is fairly easy to do parallel work with Python 3's It is also very handy to insert and retrieve items into a Queue. I have a script running in background listening for updates. Now, in theory assume that, as those updates arrive, I would queue them and do work on them concurrently using the Now, individually, all of these components work in isolation, and make sense, but how do I go about using them together? I am not aware if it is possible to feed the In a nutshell, all I want to do is, receive updates of say 4 messages a second, shove them in a queue, and get my concurrent.futures to work on them. If I don't, then I am stuck with a sequential approach which is slow. Let's take the canonical example in the Python documentation below: The list of The example from the Python docs, expanded to take its work from a queue. A change to note, is that this code uses Output from fetching each
这篇关于我将如何在实时场景中使用并发.期货和队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!url
的输出:'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes
concurrent.futures
module as shown below.with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
for future in concurrent.futures.as_completed(future_to):
data = future.result()
q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
q.get()
ThreadPoolExecutor
. ThreadPoolExecutor
work from the queue in real time unless the data to work from is predetermined?with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
URLS
is fixed. Is it possible to feed this list in real-time and get the worker to process it as they come by, perhaps from a queue for management purposes? I am a bit confused on whether my approach is actually possible?concurrent.futures.wait
instead of concurrent.futures.as_completed
to allow new work to be started while waiting for other work to complete.import concurrent.futures
import urllib.request
import time
import queue
q = queue.Queue()
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def feed_the_workers(spacing):
""" Simulate outside actors sending in work to do, request each url twice """
for url in URLS + URLS:
time.sleep(spacing)
q.put(url)
return "DONE FEEDING"
def load_url(url, timeout):
""" Retrieve a single page and report the URL and contents """
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# start a future for a thread which sends work in through the queue
future_to_url = {
executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}
while future_to_url:
# check for status of the futures which are currently working
done, not_done = concurrent.futures.wait(
future_to_url, timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED)
# if there is incoming work, start a new future
while not q.empty():
# fetch a url from the queue
url = q.get()
# Start the load operation and mark the future with its URL
future_to_url[executor.submit(load_url, url, 60)] = url
# process any completed futures
for future in done:
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
if url == 'FEEDER DONE':
print(data)
else:
print('%r page is %d bytes' % (url, len(data)))
# remove the now completed future
del future_to_url[future]
url
twice:'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes