如何在保持顺序的同时在生成器上使用线程(每个项目多个线程)? [英] How do I use threads on a generator (multiple threads per item) while keeping the order?
问题描述
我有一个模仿 REST API 调用的代码(见下文).
I have a code that is mimicking a REST API call (see below).
对于生成器的 item 中的每个键,它都需要运行一个 REST 调用.所以在我的例子中,记录可能是
For every key in the item of the generator, it needs to run a REST call. So in my example, a record could be
{"a": 2, "b": 36, "c": 77}
我需要为每个键(a
、b
和 c
)单独运行 REST 调用,然后输出结果(其中只是否定数字):
I need to run a REST call for every key (a
, b
, and c
) individually, then output the results (which just negates the number):
{"a": 2, "a_neg": -2, "b": 36, "b_neg": -36, "c": 77, "c_neg": -77}
现在我当前的代码适用于一个键,但使用多个键时,它会重复这些项目(所以我得到的结果是 3 个键的三倍).
Right now my current code works for one key, but with multiple keys, it will repeat the items (so I'm getting triple the results for 3 keys).
还有一些奇怪的竞争条件也会发生.我想我只能保留最后一条记录,但我不擅长线程并且担心线程安全或其他高级内容.
Also there is some funky race condition that occurs as well. I guess I could only keep the last record, but I'm not good with threads and concerned about thread safety or other advanced stuff.
这是一个示例输出:
{'a': 89, 'a_neg': -89, 'b': 69, 'c': 38}
{'a': 89, 'a_neg': -89, 'b': 69, 'c': 38, 'c_neg': -38}
{'a': 89, 'a_neg': -89, 'b': 69, 'b_neg': -69, 'c': 38, 'c_neg': -38}
{'a': 90, 'a_neg': -90, 'b': 43, 'c': 16}
{'a': 90, 'a_neg': -90, 'b': 43, 'c': 16, 'c_neg': -16}
{'a': 90, 'a_neg': -90, 'b': 43, 'b_neg': -43, 'c': 16, 'c_neg': -16}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
{'a': 91, 'a_neg': -91, 'b': 49, 'b_neg': -49, 'c': 77, 'c_neg': -77}
最后是我的源代码(你可以自己运行):
Finally here is my source code (you can run it yourself):
#!/usr/bin/env python
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from pprint import pprint
import random
def records():
# simulates records generator
for i in range(100):
yield {"a": i, "b": random.randint(0,100), "c": random.randint(0,100)}
def stream(records):
threads = 8
pool = ThreadPoolExecutor(threads)
def rest_api_lookup(record_dict):
# simulates REST call :)
sleep(0.1)
key = record_dict["key"]
record = record_dict["record"]
record[key + "_neg"] = -record[key]
return record
def thread(records):
chunk = []
for record in records:
for key in record:
chunk.append(pool.submit(rest_api_lookup, {"record": record, "key": key}))
if len(chunk) == threads:
yield chunk
chunk = []
if chunk:
yield chunk
def unchunk(chunk_gen):
"""Flattens a generator of Future chunks into a generator of Future results."""
for chunk in chunk_gen:
for f in chunk:
yield f.result() # get result from Future
# Now iterate over all results in same order as records
for result in unchunk(thread(records)):
#yield result
pprint(result)
stream(records())
推荐答案
这里的第一个问题是您在不断增长的记录中循环键...
1st issue here is that your looping over keys in a record that grows...
for key in list(record): # make a copy of the keys!
我认为这里的第二个问题是你有 3 个键和 8 个线程...... len(chunk)
将是 3, 6, 9
... threads
为 8
- 未达到以下条件
I think the 2nd issue here is that you have 3 keys and 8 threads... len(chunk)
will be 3, 6, 9
... threads
is 8
- the following condition is not reached
if len(chunk) == threads: # try len(chunk) >= threads
yield chunk
chunk = []
最后一个问题是您在所有线程完成之前产生未完成的记录.这是一个可能的解决方法:
last issue is that you yield uncompleted records before all threads are finish. here is a possible fix:
def unchunk(chunk_gen):
"""Flattens a generator of Future chunks into a generator of Future results."""
for chunk in chunk_gen:
old_res = None
for f in chunk:
res = f.result() # get result from Future
if old_res and res is not old_res:
yield old_res
old_res = res
if old_res:
yield old_res
这篇关于如何在保持顺序的同时在生成器上使用线程(每个项目多个线程)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!