无法在Python中创建新线程 [英] Can't create new threads in Python

查看:84
本文介绍了无法在Python中创建新线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

import threading

threads = []
for n in range(0, 60000):
    t = threading.Thread(target=function,args=(x, n))
    t.start()
    threads.append(t)
for t in threads:
    t.join()

它在笔记本电脑上的最大射程为800时效果很好,但是如果将射程增加到800以上,则会出现错误can't create new thread.

It is working well for range up to 800 on my laptop, but if I increase range to more than 800 I get the error can't create new thread.

我该如何控制要创建的线程数或以其他方式使它像超时一样工作?我尝试使用threading.BoundedSemaphore函数,但似乎无法正常工作.

How can I control number to threads to get created or any other way to make it work like timeout? I tried using threading.BoundedSemaphore function but that doesn't seem to work properly.

推荐答案

问题是,没有主要的平台(截至2013年中)允许您在此数量的线程附近创建任何地方.您可能会遇到各种各样的限制,并且在不了解平台,平台配置以及确切错误的情况下,无法知道遇到了哪个限制.但是这里有两个例子:

The problem is that no major platform (as of mid-2013) will let you create anywhere near this number of threads. There are a wide variety of different limitations you could run into, and without knowing your platform, its configuration, and the exact error you got, it's impossible to know which one you ran into. But here are two examples:

  • 在32位Windows上,默认线程堆栈为1MB,并且所有线程堆栈都必须与程序中的其他所有内容都容纳在相同的2GB虚拟内存空间中,因此您将在60000之前用尽所有内存. /li>
  • 在64位linux上,您可能会耗尽会话的软ulimit值之一,然后再耗尽页面空间. (除了POSIX要求的限制外,Linux还有许多不同的限制.)
  • On 32-bit Windows, the default thread stack is 1MB, and all of your thread stacks have to fit into the same 2GB of virtual memory space as everything else in your program, so you will run out long before 60000.
  • On 64-bit linux, you will likely exhaust one of your session's soft ulimit values before you get anywhere near running out of page space. (Linux has a variety of different limits beyond the ones required by POSIX.)

那么,我该如何控制要创建的线程数或以其他方式使它工作(如超时等)?

So, how can i control number to threads to get created or any other way to make it work like timeout or whatever?

使用尽可能多的线程并不是您真正想要做的.在8核计算机上运行800个线程意味着您要花费大量时间在线程之间进行上下文切换,并且缓存在被启动之前一直保持刷新状态,以此类推.

Using as many threads as possible is very unlikely to be what you actually want to do. Running 800 threads on an 8-core machine means that you're spending a whole lot of time context-switching between the threads, and the cache keeps getting flushed before it ever gets primed, and so on.

最可能的是,您真正想要的是以下之一:

Most likely, what you really want is one of the following:

  • 每个CPU一个线程,可服务60000个任务.
    • 也许是处理而不是线程(如果主要工作是在Python或未明确发布GIL的C代码中进行的).
    • 也许线程的数量是固定的(例如,无论您有1个内核还是64个内核,Web浏览器一次可能会同时执行12个并发请求.)
    • 也许每个库可以包含600个批次(每100个任务),而不是60000个单个任务.
    • One thread per CPU, serving a pool of 60000 tasks.
      • Maybe processes instead of threads (if the primary work is in Python, or in C code that doesn't explicitly release the GIL).
      • Maybe a fixed number of threads (e.g., a web browsers may do, say, 12 concurrent requests at a time, whether you have 1 core or 64).
      • Maybe a pool of, say, 600 batches of 100 tasks apiece, instead of 60000 single tasks.
      • 也许是显式协程而不是调度程序.
      • 或通过例如"gevent.
      • 每个CPU可能只有一个线程,每个线程运行的光纤数为N的一分之一.
      • Maybe explicit coroutines instead of a scheduler.
      • Or "magic" cooperative greenlets via, e.g. gevent.
      • Maybe one thread per CPU, each running 1/Nth of the fibers.

      但这肯定是可能.

      一旦您达到了所达到的极限,那么很可能再次尝试将失败,直到线程完成其工作并被加入为止,并且很可能再次尝试将在此之后成功.因此,鉴于您显然正在获得异常,因此可以使用Python中的其他方法来处理此异常:使用try/except块.例如,如下所示:

      Once you've hit whichever limit you're hitting, it's very likely that trying again will fail until a thread has finished its job and been joined, and it's pretty likely that trying again will succeed after that happens. So, given that you're apparently getting an exception, you could handle this the same way as anything else in Python: with a try/except block. For example, something like this:

      threads = []
      for n in range(0, 60000):
          while True:
              t = threading.Thread(target=function,args=(x, n))
              try:
                  t.start()
                  threads.append(t)
              except WhateverTheExceptionIs as e:
                  if threads:
                      threads[0].join()
                      del threads[0]
                  else:
                      raise
              else:
                  break
      for t in threads:
          t.join()
      

      当然,这假定启动的第一个任务很可能是第一个完成的任务之一.如果不是这样,您将需要某种方式来显式表示完成状态(条件,信号量,队列等),或者您需要使用一些较低级别的(特定于平台的)库来为您提供一种方法等待整个列表,直到至少一个线程结束.

      Of course this assumes that the first task launched is likely to be the one of the first tasks finished. If this is not true, you'll need some way to explicitly signal doneness (condition, semaphore, queue, etc.), or you'll need to use some lower-level (platform-specific) library that gives you a way to wait on a whole list until at least one thread is finished.

      另外,请注意,在某些平台(例如Windows XP)上,只要接近 限制,您就会得到怪异的行为.

      Also, note that on some platforms (e.g., Windows XP), you can get bizarre behavior just getting near the limits.

      除了做得更好以外,做正确的事情也可能会容易得多.例如,这是每个CPU的进程池:

      On top of being a lot better, doing the right thing will probably be a lot simpler as well. For example, here's a process-per-CPU pool:

      with concurrent.futures.ProcessPoolExecutor() as executor:
          fs = [executor.submit(function, x, n) for n in range(60000)]
          concurrent.futures.wait(fs)
      

      …和固定线程数池:

      with concurrent.futures.ThreadPoolExecutor(12) as executor:
          fs = [executor.submit(function, x, n) for n in range(60000)]
          concurrent.futures.wait(fs)
      

      ...以及带有numpy矢量化的平衡CPU并行处理批处理池:

      … and a balancing-CPU-parallelism-with-numpy-vectorization batching pool:

      with concurrent.futures.ThreadPoolExecutor() as executor:
          batchsize = 60000 // os.cpu_count()
          fs = [executor.submit(np.vector_function, x, 
                                np.arange(n, min(n+batchsize, 60000)))
                for n in range(0, 60000, batchsize)]
          concurrent.futures.wait(fs)
      


      在上面的示例中,我使用列表理解来提交所有工作并收集他们的未来,因为我们在循环中没有做任何其他事情.但是从您的评论看来,您似乎还需要在循环中执行其他操作.因此,让我们将其转换回显式的for语句:


      In the examples above, I used a list comprehension to submit all of the jobs and gather their futures, because we're not doing anything else inside the loop. But from your comments, it sounds like you do have other stuff you want to do inside the loop. So, let's convert it back into an explicit for statement:

      with concurrent.futures.ProcessPoolExecutor() as executor:
          fs = []
          for n in range(60000):
              fs.append(executor.submit(function, x, n))
          concurrent.futures.wait(fs)
      

      现在,无论您想在该循环中添加什么,都可以.

      And now, whatever you want to add inside that loop, you can.

      但是,我不认为您实际上要在该循环内添加任何内容.循环只是尽可能快地提交所有作业. wait函数位于等待它们全部完成的位置,您可能想早点退出.

      However, I don't think you actually want to add anything inside that loop. The loop just submits all the jobs as fast as possible; it's the wait function that sits around waiting for them all to finish, and it's probably there that you want to exit early.

      为此,您可以将waitFIRST_COMPLETED标志一起使用,但是使用

      To do this, you can use wait with the FIRST_COMPLETED flag, but it's much simpler to use as_completed.

      此外,我假设error是由任务设置的某种值.在这种情况下,您需要在其周围放一个 Lock 线程之间共享的任何其他可变值. (在这个地方,ProcessPoolExecutorThreadPoolExecutor之间的差异仅比单行多一点-如果使用进程,则需要multiprocessing.Lock而不是threading.Lock.)

      Also, I'm assuming error is some kind of value that gets set by the tasks. In that case, you will need to put a Lock around it, as with any other mutable value shared between threads. (This is one place where there's slightly more than a one-line difference between a ProcessPoolExecutor and a ThreadPoolExecutor—if you use processes, you need multiprocessing.Lock instead of threading.Lock.)

      所以:

      error_lock = threading.Lock
      error = []
      
      def function(x, n):
          # blah blah
          try:
              # blah blah
          except Exception as e:
              with error_lock:
                  error.append(e)
          # blah blah
      
      with concurrent.futures.ProcessPoolExecutor() as executor:
          fs = [executor.submit(function, x, n) for n in range(60000)]
          for f in concurrent.futures.as_completed(fs):
              do_something_with(f.result())
              with error_lock:
                  if len(error) > 1: exit()
      


      但是,您可能需要考虑其他设计.通常,如果您可以避免线程之间的共享,那么您的生活会变得更加轻松.期货旨在通过让您返回值或引发异常来简化这一过程,就像常规函数调用一样. f.result()将给您返回的值或引发引发的异常.因此,您可以将该代码重写为:


      However, you might want to consider a different design. In general, if you can avoid sharing between threads, your life gets a lot easier. And futures are designed to make that easy, by letting you return a value or raise an exception, just like a regular function call. That f.result() will give you the returned value or raise the raised exception. So, you can rewrite that code as:

      def function(x, n):
          # blah blah
          # don't bother to catch exceptions here, let them propagate out
      
      with concurrent.futures.ProcessPoolExecutor() as executor:
          fs = [executor.submit(function, x, n) for n in range(60000)]
          error = []
          for f in concurrent.futures.as_completed(fs):
              try:
                  result = f.result()
              except Exception as e:
                  error.append(e)
                  if len(error) > 1: exit()
              else:
                  do_something_with(result)
      

      请注意,它与文档中的 ThreadPoolExecutor示例有多相似.只要任务不需要相互交互,这种简单的模式就足以处理几乎所有没有锁的东西.

      Notice how similar this looks to the ThreadPoolExecutor Example in the docs. This simple pattern is enough to handle almost anything without locks, as long as the tasks don't need to interact with each other.

      这篇关于无法在Python中创建新线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆