在python中使用多线程队列的正确方法是什么? [英] Using Multithreaded queue in python the correct way?

查看:73
本文介绍了在python中使用多线程队列的正确方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 python 中使用队列,它将是多线程的.我只是想知道我使用的方法是否正确.如果我正在做一些多余的事情,或者我应该使用更好的方法.

我正在尝试从表中获取新请求,并使用某种逻辑安排它们来执行某些操作,例如运行查询.

所以从主线程这里我为队列生成了一个单独的线程.

如果 __name__=='__main__':request_queue = SetQueue(maxsize=-1)工人 = 线程(目标 = request_queue.process_queue)worker.setDaemon(真)worker.start()为真:尝试:#连接数据库获取所有待验证的新请求db = 数据库(用户名_testschema,密码_testschema,mother_host_testschema,mother_port_testschema,mother_sid_testschema,0)#获取新的验证请求verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE JOB_STATUS='%s' ORDER BY JOB_ID" %(username_testschema, 'INITIATED'))#如果有一些请求需要验证,则将它们放入队列中.如果 len(verify_these) >0:对于 verify_these 中的行:打印正在验证:%s"% 行[0]verify_id = 行[0]request_queue.put(verify_id)除了作为 e 的例外:logger.exception(e)最后:时间.睡眠(10)

现在在 Setqueue 类中,我有一个 process_queue 函数,用于处理添加到队列的每次运行中的前 2 个请求.

'''覆盖 Queue 类以使用 set as all_items 而不是 list 以确保始终添加和处理唯一项目,'''类 SetQueue(Queue.Queue):def _init(self, maxsize):Queue.Queue._init(self, maxsize)self.all_items = set()def _put(self, item):如果项目不在 self.all_items 中:Queue.Queue._put(self, item)self.all_items.add(item)'''用于验证过程的多线程队列.取前两项,在单独的线程中验证它们并休眠 10 秒.这样每次运行最多处理两个请求.'''def process_queue(self):为真:scheduler_obj = Scheduler()尝试:如果 self.qsize() >0:对于范围(2)中的我:job_id = self.get()t = Thread(target=scheduler_obj.verify_func, args=(job_id,))t.start()对于范围内的我(2):t.join(超时=1)self.task_done()除了作为 e 的例外:记录器.异常(队列异常:处理验证队列中的请求时发生异常")最后:时间.睡眠(10)

我想看看我的理解是否正确,是否有任何问题.

因此,在主函数中运行的主线程连接到数据库的 True 获取新请求并将其放入队列中.队列的工作线程(守护进程)不断从队列中获取新请求并派生进行处理的非守护线程线程,并且由于加入超时为 1,工作线程将继续接受新请求而不会被阻塞,并且它的子线程将继续在后台处理.正确吗?

因此,如果主进程退出,这些在完成工作之前不会被杀死,但工作守护进程线程将退出.疑问:如果父进程是守护进程,子进程不是守护进程,如果父进程退出,子进程是否会退出?).

<小时>

我也在这里阅读:-

使用池中的进程并并行运行 verify_func.这会给我带来更多的表现吗?

虽然可以为队列创建一个新的独立线程,并按照您的方式分别处理该数据,但我相信每个线程都更常见独立的工作线程将消息发布到他们已经知道"的队列关于.然后通过从该队列中拉出消息,从某个其他线程处理该队列.

设计理念

我认为您的应用程序是三个线程.主线程和两个工作线程.1 个工作线程将从数据库中获取请求并将它们放入队列中.另一个工作线程将处理队列中的数据

主线程将使用线程函数 .join() 等待其他线程完成

您将保护线程有权访问的队列,并通过使用互斥锁使其成为线程安全的.我在其他语言的许多其他设计中也看到了这种模式.

推荐阅读

有效的 Python"作者 Brett Slatkin 有一个很好的例子来说明这个问题.

他没有从 Queue 继承,而是在他的类中为其创建了一个包装器调用 MyQueue 并添加了 get() 和 put(message) 函数.

他甚至在他的 Github 仓库中提供了源代码

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

我不隶属于这本书或其作者,但我强烈推荐它,因为我从中学到了很多东西:)

I am trying to use The Queue in python which will be multithreaded. I just wanted to know the approach I am using is correct or not. And if I am doing something redundant or If there is a better approach that I should use.

I am trying to get new requests from a table and schedule them using some logic to perform some operation like running a query.

So here from the main thread I spawn a separate thread for the queue.

if __name__=='__main__':

  request_queue = SetQueue(maxsize=-1)
  worker = Thread(target=request_queue.process_queue)
  worker.setDaemon(True)
  worker.start()


  while True:
    try:
      #Connect to the database get all the new requests to be verified
      db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
      #Get new requests for verification
      verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE     JOB_STATUS='%s' ORDER BY JOB_ID" %
                             (username_testschema, 'INITIATED'))

      #If there are some requests to be verified, put them in the queue.
      if len(verify_these) > 0:
        for row in verify_these:
          print "verifying : %s" % row[0]
          verify_id = row[0]
          request_queue.put(verify_id)
    except Exception as e:
      logger.exception(e)
    finally:
      time.sleep(10)

Now in the Setqueue class I have a process_queue function which is used for processing the top 2 requests in every run that were added to the queue.

'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''

class SetQueue(Queue.Queue):
  def _init(self, maxsize):
    Queue.Queue._init(self, maxsize)
    self.all_items = set()

  def _put(self, item):
    if item not in self.all_items:
      Queue.Queue._put(self, item)
      self.all_items.add(item)

  '''
  The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
  This way max two requests per run will be processed.
  '''
  def process_queue(self):
    while True:
      scheduler_obj = Scheduler()

      try:
        if self.qsize() > 0:
          for i in range(2):
            job_id = self.get()
            t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
            t.start()

          for i in range(2):
            t.join(timeout=1)
            self.task_done()

      except Exception as e:
        logger.exception(
          "QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
      finally:
        time.sleep(10)

I want to see if my understanding is correct and if there can be any issues with it.

So the main thread running in while True in the main func connects to database gets new requests and puts it in the queue. The worker thread(daemon) for the queue keeps on getting new requests from the queue and fork non-daemon threads which do the processing and since timeout for the join is 1 the worker thread will keep on taking new requests without getting blocked, and its child thread will keep on processing in the background. Correct?

So in case if the main process exit these won`t be killed until they finish their work but the worker daemon thread would exit. Doubt : If the parent is daemon and child is non daemon and if parent exits does child exit?).


I also read here :- David beazley multiprocessing

By david beazley in using a Pool as a Thread Coprocessor section where he is trying to solve a similar problem. So should I follow his steps :- 1. Create a pool of processes. 2. Open a thread like I am doing for request_queue 3. In that thread

  def process_verification_queue(self):
    while True:
      try:
        if self.qsize() > 0:
          job_id = self.get()
          pool.apply_async(Scheduler.verify_func, args=(job_id,))
      except Exception as e:
        logger.exception("QUEUE EXCEPTION : Exception occured while    processing requests in the VERIFICATION QUEUE")

Use a process from the pool and run the verify_func in parallel. Will this give me more performance?

解决方案

While its possible to create a new independent thread for the queue, and process that data separately the way you are doing it, I believe it is more common for each independent worker thread to post messages to a queue that they already "know" about. Then that queue is processed from some other thread by pulling messages out of that queue.

Design Idea

The way I invision your application would be three threads. The main thread, and two worker threads. 1 worker thread would get requests from the database and put them in the queue. The other worker thread would process that data from the queue

The main thread would just waiting for the other threads to finish by using the thread functions .join()

You would protect queue that the threads have access to and make it thread safe by using a mutex. I have seen this pattern in many other designs in other languages as well.

Suggested Reading

"Effective Python" by Brett Slatkin has a great example of this very question.

Instead of inheriting from Queue, he just creates a wrapper to it in his class called MyQueue and adds a get() and put(message) function.

He even provides the source code at his Github repo

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py

I'm not affiliated with the book or its author, but I highly recommend it as I learned quite a few things from it :)

这篇关于在python中使用多线程队列的正确方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆