如何在保持顺序的同时在生成器上使用线程(每个项目多个线程)? [英] How do I use threads on a generator (multiple threads per item) while keeping the order?

查看:63
本文介绍了如何在保持顺序的同时在生成器上使用线程(每个项目多个线程)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个模仿 REST API 调用的代码(见下文).

I have a code that is mimicking a REST API call (see below).

对于生成器的 item 中的每个键,它都需要运行一个 REST 调用.所以在我的例子中,记录可能是

For every key in the item of the generator, it needs to run a REST call. So in my example, a record could be

{"a": 2, "b": 36, "c": 77}

我需要为每个键(abc)单独运行 REST 调用,然后输出结果(其中只是否定数字):

I need to run a REST call for every key (a, b, and c) individually, then output the results (which just negates the number):

{"a": 2, "a_neg": -2, "b": 36, "b_neg": -36, "c": 77, "c_neg": -77}

现在我当前的代码适用于一个键,但使用多个键时,它会重复这些项目(所以我得到的结果是 3 个键的三倍).

Right now my current code works for one key, but with multiple keys, it will repeat the items (so I'm getting triple the results for 3 keys).

还有一些奇怪的竞争条件也会发生.我想我只能保留最后一条记录,但我不擅长线程并且担心线程安全或其他高级内容.

Also there is some funky race condition that occurs as well. I guess I could only keep the last record, but I'm not good with threads and concerned about thread safety or other advanced stuff.

这是一个示例输出:

{'a': 89, 'a_neg': -89, 'b': 69, 'c': 38}
{'a': 89, 'a_neg': -89, 'b': 69, 'c': 38, 'c_neg': -38}
{'a': 89, 'a_neg': -89, 'b': 69, 'b_neg': -69, 'c': 38, 'c_neg': -38}
{'a': 90, 'a_neg': -90, 'b': 43, 'c': 16}
{'a': 90, 'a_neg': -90, 'b': 43, 'c': 16, 'c_neg': -16}
{'a': 90, 'a_neg': -90, 'b': 43, 'b_neg': -43, 'c': 16, 'c_neg': -16}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}

最后是我的源代码(你可以自己运行):

Finally here is my source code (you can run it yourself):

#!/usr/bin/env python

from concurrent.futures import ThreadPoolExecutor
from time import sleep
from pprint import pprint
import random

def records():
    # simulates records generator
    for i in range(100):
        yield {"a": i, "b": random.randint(0,100), "c": random.randint(0,100)}

def stream(records):
    threads = 8
    pool = ThreadPoolExecutor(threads)

    def rest_api_lookup(record_dict):
        # simulates REST call :)
        sleep(0.1)
        key = record_dict["key"]
        record = record_dict["record"]

        record[key + "_neg"] = -record[key]

        return record

    def thread(records):
        chunk = []
        for record in records:
            for key in record:
                chunk.append(pool.submit(rest_api_lookup, {"record": record, "key": key}))

            if len(chunk) == threads:
                yield chunk
                chunk = []

        if chunk:
            yield chunk

    def unchunk(chunk_gen):
        """Flattens a generator of Future chunks into a generator of Future results."""
        for chunk in chunk_gen:
            for f in chunk:
                yield f.result() # get result from Future

    # Now iterate over all results in same order as records
    for result in unchunk(thread(records)):
        #yield result
        pprint(result)

stream(records())

推荐答案

这里的第一个问题是您在不断增长的记录中循环键...

1st issue here is that your looping over keys in a record that grows...

for key in list(record):  # make a copy of the keys!

我认为这里的第二个问题是你有 3 个键和 8 个线程...... len(chunk) 将是 3, 6, 9 ... threads8 - 未达到以下条件

I think the 2nd issue here is that you have 3 keys and 8 threads... len(chunk) will be 3, 6, 9 ... threads is 8 - the following condition is not reached

        if len(chunk) == threads:  # try len(chunk) >= threads
            yield chunk
            chunk = []

最后一个问题是您在所有线程完成之前产生未完成的记录.这是一个可能的解决方法:

last issue is that you yield uncompleted records before all threads are finish. here is a possible fix:

def unchunk(chunk_gen):
    """Flattens a generator of Future chunks into a generator of Future results."""
    for chunk in chunk_gen:
        old_res = None
        for f in chunk:
            res = f.result() # get result from Future
            if old_res and res is not old_res:
                yield old_res
            old_res = res
    if old_res:
        yield old_res

这篇关于如何在保持顺序的同时在生成器上使用线程(每个项目多个线程)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆