仅在有免费工作者的情况下如何产生未来 [英] How to spawn future only if free worker is available

查看:63
本文介绍了仅在有免费工作者的情况下如何产生未来的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试将从大文件行中提取的信息发送到某个服务器上运行的进程.

I am trying to send information extracted from lines of a big file to a process running on some server.

为了加快速度,我想对一些并行线程执行此操作.

To speed this up, I would like to do this with some threads in parallel.

使用 concurrent.futures 的Python 2.7反向端口尝试过这个:

Using the Python 2.7 backport of concurrent.futures I tried this:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
    for line in f:
        e.submit(send_line_function, line)
f.close()

但是,这是有问题的,因为所有期货都立即提交,因此我的机器用尽了内存,因为整个文件都已加载到内存中.

However, this is problematic, because all futures get submitted instantly, so that my machine runs out of memory, because the complete file gets loaded into memory.

我的问题是,是否有一种简便的方法仅在有免费工作人员可用时才提交新的未来.

My question is, if there is an easy way to only submit a new future when a free worker is available.

推荐答案

您可以使用

for chunk in zip(*[f]*chunksize):

(这是石斑鱼食谱的应用,它将迭代器f中的项目收集为大小为chunksize的组.注意:由于zip在Python3中返回迭代器,因此不会一次使用整个文件.

(This is an application of the grouper recipe, which collects items from the iterator f into groups of size chunksize. Note: This does not consume the entire file at once since zip returns an iterator in Python3.)

import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

def worker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk in zip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)


现在,在注释中您正确地指出这不是最佳选择. 可能有一些工人要花很长时间,并且要承担很多工作.


Now, in the comments you rightly point out that this is not optimal. There could be some worker which takes a long time, and holds up a whole chunk of jobs.

通常,如果每次致电工人都花费大致相同的时间,那么这没什么大不了的.但是,这是一种按需推进文件句柄的方法.它使用threading.Condition通知sprinkler推进文件句柄.

Usually, if each call to worker takes roughly the same amount of time then this is not a big deal. However, here is a way to advance the filehandle on-demand. It uses a threading.Condition to notify the sprinkler to advance the filehandle.

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
    for line in iter(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

def sprinkler(cond, queue, num_workers):
    with open("big_file") as f:
        for line in f:
            logger.info('advancing filehandle') 
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ in range(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()

这篇关于仅在有免费工作者的情况下如何产生未来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆