限制活动线程数 Python [英] Limit number of active threads Python

查看:53
本文介绍了限制活动线程数 Python的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有顺序生产者消费者模型,它需要大量时间来执行.所以我试图让消费者代码同时运行.

I have sequential producer consumer model which is taking a lot of time to execute. So I am trying to make the consumer code run concurrently.

注意:对象是一个生成器.

Note: objects is a generator.

func report_object(self, object_type, objects):
    for obj in objects:
        try:
            change_handler(obj, self.config)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
    else:
        LOG.info(" Consumer: no objects reported")

以上函数的线程实现:

import threading

func report_object(self, object_type, objects):
    threads = []
    for obj in objects:
        try:
            t = threading.Thread(target=change_handler,args=(obj, self.config))
            LOG.info(" ***** Number of active threads: %d *****", threading.activeCount())
            t.start()
            threads.append(t)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
   for t in threads: 
      t.join()
   else:
       LOG.info(" Consumer: no objects reported")

如果遵循上述机制,我将运行与 len(objects) 一样多的线程.在这种情况下,如果对象变得非常大,例如 1000/10000,那么会产生什么影响?会有竞争条件吗?如果是,那么我该如何防止这种情况?我尝试了另一种解决方案,例如:

If the above mechanism is followed I am running as many threads as len(objects). I this case if the objects become very huge like 1000/10000 then what will be the impact? Will there be a race condition? If yes then how can I prevent this? I tried another solution like:

threads = [ threading.Thread(target=change_handler,args=(obj, self.config)) for _ in range(8)]
for thread in threads:
    thread.start()
    LOG.info(thread.name)


for thread in threads:
    thread.join()

活动线程的数量仍在增加.限制活动线程数的最佳方法以及使上述函数并发运行的最佳方法是什么.

The number of active thread is still increasing. What would be the best way to restrict the number of active threads and best way to make the above function run concurrently.

推荐答案

控制线程数量的最好方法是使用 concurrent.futures 中的 ThreadPoolExecutor包,并且有几种方法可以做到这一点.一种方法是使用 submit 方法,该方法返回一个 Future 对象,表示线程的未来完成情况.如果线程返回一个结果,你可以在这个对象上调用 result 方法,这个方法会阻塞直到调用完成,然后返回调用返回的值(当然还有很多其他方法您可以调用 Future 对象).如果线程没有返回值,或者您不需要测试是否成功完成,您就没有义务保存 Future 对象.

The best way of controlling the number of threads is to use the ThreadPoolExecutor from the concurrent.futures package, and there are several ways of doing this. One way is to use the submit method, which returns a Future object representing the future completion of the thread. If the thread returns a result, you can call the result method on this object which will block until the call is complete and then returns the value returned from the call (there are, of course, many other methods you can call on the Future object). You are not obliged to save the Future object if the thread does not return a value or you do not otherwise need to test for successful completion.

以下是如何使用 ThreadPoolExecutor 的示例:

Here is an example of how to use the ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
import time, random

def my_thread(n):
    time.sleep(random.random())
    return n, time.time()

MAX_THREADS = 10

with ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
    futures = [e.submit(my_thread, n) for n in range(15)]
    for f in futures:
        print(f.result())

打印:

(0, 1586782110.1816075)
(1, 1586782109.4404495)
(2, 1586782109.6663365)
(3, 1586782109.8307955)
(4, 1586782109.6733325)
(5, 1586782109.6103601)
(6, 1586782109.3914738)
(7, 1586782109.6803281)
(8, 1586782109.8587916)
(9, 1586782109.7173235)
(10, 1586782110.3664994)
(11, 1586782110.1816075)
(12, 1586782110.518443)
(13, 1586782110.4524374)
(14, 1586782110.0256832)

这篇关于限制活动线程数 Python的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆