再次执行失败的线程 [英] Executed failed threads again

查看:71
本文介绍了再次执行失败的线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我有使用大约50k线程的脚本,但是一次只能运行10个线程.我为此和BoundedSemaphore使用线程库将线程一次限制为10个.在某些情况下,所有线程没有足够的内存,但是处理所有线程很重要,因此我想重复那些由于内存不足而被杀死的线程.

So I have have script that uses about 50k threads, but only runs 10 at a time. I use the threading library for this and BoundedSemaphore to limit the threads to 10 at a time. In some cases there is not enough memory for all threads, but it is important that all threads get processed so I would like to repeat those threads that got killed because of insufficient memory.

import some_other_script, threading


class myThread (threading.Thread):
    def __init__(self, item):
        threading.Thread.__init__(self)
        self.item = item
    def run(self):
        threadLimiter.acquire()
        some_other_script.method(self.item)
        somelist.remove(self.item)
        threadLimiter.release()


threadLimiter = threading.BoundedSemaphore(10)

somelist = ['50,000 Items','.....]
for item in somelist:
    myThread(item).start()

您可以看到,到目前为止我唯一想出的主意是使用somelist.remove(self.item)从每个线程中的列表中删除已处理的项目. (每个项目都是唯一的,并且在列表中仅出现一次). 我的想法是我可以在for循环周围运行while循环,以检查它是否仍然包含不起作用的项目,因为在for循环完成之后,线程未完成,因此列表不为空. 我想做的是捕捉那些失败的原因,因为系统内存不足并再次执行它们(如果需要,还会执行一次).

As you can see the only idea I could come up with so far was to delete the item that got processed from the list within every thread with somelist.remove(self.item). (Each item is unique and only present once within the list). My idea was that I could run a while loop around the for loop, to check if it still contains items, which did not work, because after the for loop is finished the threads are not finished an so the list isn't empty. What I want to do is to catch those which fail, because the systems runs out of memory and executed them again (and again if need be).

非常感谢您!

推荐答案

这既解决了过多的活动线程问题,又解决了您的问题:

This solves both the too many active threads problem and the problem in your question:

    def get_items():
          threads = threading.enumerate()
          items = set()
          for thr in threads:
              if isinstance(thr, myThread): items.add(thr.item)
          return items
    def manageThreads(howmany):
         while bigset:
             items = get_items()
             items_to_add = bigset.difference(items)
             while len(items) < howmany:
                 item = items_to_add.pop()
                 processor = myThread(item)
                 processor.start()
             with thread_done:    
                 thread_done.wait()
   thread_done = threading.Condition()
   bigset = set(["50,000 items", "..."])
   manageThreads(10)

mythread类的运行方法:

The mythread class run method:

def run(self):
    try:
        some_other_script.method(self.item)
        bigset.remove(self.item)
    finally:
        with thread_done:
            thread_done.notify()

Threading.enumerate()返回当前活动的线程对象的列表.因此,manageThreads函数最初创建10个线程,然后等待一个线程完成,然后再次检查线程计数,依此类推.如果线程内存不足或在处理过程中发生另一个错误,它将不会从bigset中删除该项目,从而导致管理器将其重新排队到另一个线程上.

Threading.enumerate() returns a list of currently active thread objects. So, the manageThreads function initially creates 10 threads, then waits for one to finish, then checks the thread count again, and so on. If a thread runs out of memory or another error occurs during processing, it wont remove the item from the bigset, causing it to be requeued by the manager onto a different thread.

这篇关于再次执行失败的线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆