我将如何在实时场景中使用 concurrent.futures 和队列? [英] How would I go about using concurrent.futures and queues for a real-time scenario?

查看:37
本文介绍了我将如何在实时场景中使用 concurrent.futures 和队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 Python 3 的 concurrent.futures 模块进行并行工作相当容易,如下所示.

It is fairly easy to do parallel work with Python 3's concurrent.futures module as shown below.

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
    for future in concurrent.futures.as_completed(future_to):
        data = future.result()  

在队列中插入和检索项目也非常方便.

It is also very handy to insert and retrieve items into a Queue.

q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
   q.get()

我有一个脚本在后台运行,监听更新.现在,理论上假设,当这些更新到达时,我会将它们排队并使用 ThreadPoolExecutor 同时处理它们.

I have a script running in background listening for updates. Now, in theory assume that, as those updates arrive, I would queue them and do work on them concurrently using the ThreadPoolExecutor.

现在,单独地,所有这些组件都是独立工作的,并且有意义,但是我该如何一起使用它们呢?我不知道是否可以从队列中实时提供 ThreadPoolExecutor 工作,除非预先确定要工作的数据?

Now, individually, all of these components work in isolation, and make sense, but how do I go about using them together? I am not aware if it is possible to feed the ThreadPoolExecutor work from the queue in real time unless the data to work from is predetermined?

简而言之,我想做的就是,每秒接收 4 条消息的更新,将它们放入队列中,然后让我的 concurrent.futures 处理它们.如果我不这样做,那么我就会陷入缓慢的顺序方法中.

In a nutshell, all I want to do is, receive updates of say 4 messages a second, shove them in a queue, and get my concurrent.futures to work on them. If I don't, then I am stuck with a sequential approach which is slow.

让我们以 Python 中的规范示例以下文档:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

URLS 的列表是固定的.是否可以实时提供此列表并让工作人员在他们来时处理它,也许是出于管理目的从队列中?我对我的方法是否实际上可能感到有些困惑?

The list of URLS is fixed. Is it possible to feed this list in real-time and get the worker to process it as they come by, perhaps from a queue for management purposes? I am a bit confused on whether my approach is actually possible?

推荐答案

example 来自 Python 文档,扩展为从队列中获取其工作.需要注意的一个变化是,此代码使用 concurrent.futures.wait 而不是 concurrent.futures.as_completed 以允许在等待其他工作完成的同时开始新工作.

The example from the Python docs, expanded to take its work from a queue. A change to note, is that this code uses concurrent.futures.wait instead of concurrent.futures.as_completed to allow new work to be started while waiting for other work to complete.

import concurrent.futures
import urllib.request
import time
import queue

q = queue.Queue()

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def feed_the_workers(spacing):
    """ Simulate outside actors sending in work to do, request each url twice """
    for url in URLS + URLS:
        time.sleep(spacing)
        q.put(url)
    return "DONE FEEDING"

def load_url(url, timeout):
    """ Retrieve a single page and report the URL and contents """
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

    # start a future for a thread which sends work in through the queue
    future_to_url = {
        executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}

    while future_to_url:
        # check for status of the futures which are currently working
        done, not_done = concurrent.futures.wait(
            future_to_url, timeout=0.25,
            return_when=concurrent.futures.FIRST_COMPLETED)

        # if there is incoming work, start a new future
        while not q.empty():

            # fetch a url from the queue
            url = q.get()

            # Start the load operation and mark the future with its URL
            future_to_url[executor.submit(load_url, url, 60)] = url

        # process any completed futures
        for future in done:
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                if url == 'FEEDER DONE':
                    print(data)
                else:
                    print('%r page is %d bytes' % (url, len(data)))

            # remove the now completed future
            del future_to_url[future]

两次获取每个 url 的输出:

Output from fetching each url twice:

'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes

这篇关于我将如何在实时场景中使用 concurrent.futures 和队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆