由于阻塞Queue.get()方法而导致死锁 [英] deadlock because of blocking Queue.get() method

查看:101
本文介绍了由于阻塞Queue.get()方法而导致死锁的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正如标题所示,我陷入僵局,不知道为什么.我有多个生产者,只有一个消费者.线程调用队列的get方法后,schedule_task方法将被多个进程调用

As the title implies I have a deadlock and no idea why. I have multiple producers and only one consumer. The schedule_task method will get called by multiple processes after the thread has called the get method of the queue

from logging import getLogger
from time import sleep
from threading import Event, Thread
from multiprocessing import Process
from Queue import Queue


class TaskExecutor(object):
    def __init__(self):
        print("init taskExecutor")
        self.event = Event()
        self.taskInfos = Queue()
        task_thread = Thread(target=self._run_worker_thread)
        self._instantEnd = False
        self._running = True
        task_thread.daemon = True
        task_thread.start()

    def _run_worker_thread(self):
        print("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                print("try to get queued task from %s" % str(self.taskInfos))
                msg, task = self.taskInfos.get()
                print("got task %s for msg: %s" % str(task), str(msg))
                task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                logger.error("Error: %s" % e.message)
        print("shutting down TaskExecutor!")

    def is_running(self):
        return True

    def schedule_task(self, msg, task):
        try:
            print("appending task '%s' for msg: %s" % (str(task), str(msg)))
            self.taskInfos.put((msg, task))
            print("into queue: %s " % str(self.taskInfos))
        except Exception, e:
            print("queue is probably full: %s" % str(e))


class Task(object):

    def execute(self, msg):
        print(msg)


executor = TaskExecutor()

def produce():
    cnt = 0
    while True:
        executor.schedule_task("Message " + str(cnt), Task())
        cnt += 1
        sleep(1)

for i in range(4):
    p = Process(target=produce)
    p.start()

从我的日志中,我得到:

From my logs I get:

init taskExecutor
start running taskExcecutor worker Thread
try to get queued task from <Queue.Queue instance at 0x7fdd09830cb0>
 appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f35d0>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3490>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 

有人可以解释一下,我想念什么吗?

Can someone please explain, what am I missing?

推荐答案

虽然其他人无法运行此代码(它不是自包含的),但是您显示的内容中没有明显的问题.所以问题出在您未显示的内容中-也许是在创建和使用TaskExecutor实例的代码中.

While it's not possible for other people to run this code (it's not self-contained), there's no obvious problem in what you showed. So the problem is in something you haven't showed - perhaps in the code creating and using instances of TaskExecutor.

当我插入丢失的东西时,凭空整理而成的代码可以很好地工作.

And when I plugged in missing pieces I made up out of thin air, this code worked fine.

因此,您需要展示的不仅仅是这些.如何更换:

So you need to show more than just this. How about replacing:

logger.debug("try to get queued task")

使用

logger.debug("try to get queued task from queue %s", self.taskInfos)

?然后,至少我们可以看到您的生产者是否使用与您的消费者相同的队列.

? Then at least we could see whether your producers are using the same queue as your consumer.

下一步

感谢您添加.下一步:这是一个独立的程序供您尝试.这非常像您的代码.看看它是否为您正确运行(对我而言):

Thanks for adding that. Next up: here's a self-contained program for you to try. It's very much like your code. See whether it runs correctly for you (it does for me):

from threading import Thread, Lock
from Queue import Queue

class Logger:
     def __init__(self):
         self.iolock = Lock()

     def debug(self, str, *msg):
         with self.iolock:
             print str % msg

     error = debug

logger = Logger()

class TaskExecutor(object):
    def __init__(self):
        logger.debug("init taskExecutor")
        self.taskInfos = Queue()
        task_thread = Thread(target=self._run_worker_thread)
        task_thread.daemon = True
        task_thread.start()

    def is_running(self):
        return True

    def _run_worker_thread(self):
        logger.debug("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                logger.debug("try to get queued task from queue %s", self.taskInfos)
                msg, task = self.taskInfos.get()
                logger.debug("got task %s for msg: %s", str(task), str(msg))
                #task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                logger.error("Error: %s", e.message)
        logger.debug("shutting down TaskExecutor!")

    def schedule_task(self, msg, task):
        try:
            logger.debug("appending task '%s' for msg: %s", str(task), str(msg))
            self.taskInfos.put((msg, task))
            logger.debug("into queue: %s ", str(self.taskInfos))
        except Exception, e:
            logger.debug("queue is probably full: %s", str(e))

te = TaskExecutor()

def runit():
    for i in range(10):
        te.schedule_task("some task", i)

main = Thread(target=runit)
main.start()

下一步

好的,此代码可能无法正常工作.在Linux-y系统上,恰好创建了TaskExecutor的一个实例,在这里:

OK, this code can't possibly work. On a Linux-y system, exactly one instance of TaskExecutor is created, here:

executor = TaskExecutor()

这在主程序中发生.每次您这样做:

That happens in the main program. Each time you do:

p = Process(target=produce)

您的主程序已被fork()编辑.虽然分叉的进程也可以看到executor,但这是主程序executor的地址空间副本,与主程序中的executor没有任何关系(通常的副本-on-write fork()语义).

your main program is fork()'ed. While the forked processes also see executor, that's an address-space copy of the main program's executor, and has nothing whatsoever to do with the executor in the main program (the usual copy-on-write fork() semantics).

每个子进程还具有executor数据成员(包括其Queue)的副本.所有子进程都将数据放在其自己的executor唯一副本上,但是使用者线程在主程序中仅运行 ,而任何辅助进程都对其副本不执行任何操作executor的值可以对主程序的使用者线程所看到的内容产生任何影响.

Each child process also has a copy of executor's data members, including its Queue. All the child processes put data on their own unique copy of executor, but the consumer thread is running only in the main program, and nothing worker processes do to their copy of executor can have any effect on what the main program's consumer thread sees.

所以这真的很困惑.我现在必须停下来尝试弄清楚您可能真的想在这里做什么;-)如果您想发挥点子,请使用multiprocessing.Queue进行调查.在进程之间进行通信的唯一方法是使用从头开始构建的对象,以支持进程间通信. Queue.Queue永远不会做到这一点.

So this is really confused. I have to stop now to try to figure out what you might really want to be doing here ;-) If you want to play with ideas, investigate using a multiprocessing.Queue. The only way to communicate between processes is to use objects that are built from the ground up to support inter-process communication. Queue.Queue will never work for that.

还有一个

这是一个在整个进程中都可以正常运行的程序,甚至在Windows上也是如此;-)

Here's one that works fine across processes, and even on Windows ;-)

from time import sleep
from threading import Thread
from multiprocessing import Process, JoinableQueue

class TaskExecutor(Thread):
    def __init__(self):
        print("init taskExecutor")
        Thread.__init__(self)
        self.taskInfos = JoinableQueue()

    def getq(self):
        return self.taskInfos

    def run(self):
        print("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                print("try to get queued task from %s" % self.taskInfos)
                msg, task = self.taskInfos.get()
                print("got task %s for msg: %s" % (task, msg))
                task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                print("Error: %s" % e.message)
        print("shutting down TaskExecutor!")

    def is_running(self):
        return True

class Task(object):
    def execute(self, msg):
        print(msg)

def produce(q):
    cnt = 0
    while True:
        q.put(("Message " + str(cnt), Task()))
        cnt += 1
        sleep(1)

if __name__ == "__main__":
    executor = TaskExecutor()
    executor.start()
    for i in range(4):
        p = Process(target=produce, args=(executor.getq(),))
        p.start()

if __name__ == "__main__"部分不仅允许其在Windows上运行,它也具有很大的文档"值,一眼就可以看出executor实际上仅在 中运行主程序.

The if __name__ == "__main__" part doesn't just allow it to run on Windows, it has great "documentation" value too, making it obvious at a glance that executor indeed runs only in the main program.

对您来说,一个问题是,这是否就是您想要的分工.您是否真的要主要流程-并且只有主要流程-完成所有

A question for you is whether this is the division of labor you want, though. Do you really want the main process - and only the main process - to do all the

   task.execute(msg)

工作吗?从这里无法猜测是否就是您想要的.这就是代码的作用.

work? No way from here to guess whether that's what you want. That is what the code does.

样式点:请注意,这摆脱了schedule_task()方法.并行处理可能很困难,而且数十年来,我发现保持线程间/进程间通信尽可能简单且显而易见是非常有价值的.这意味着,除其他外,直接使用消息队列,而不是例如将它们隐藏在方法中.在这种情况下,抽象层通常使正确的代码更难来创建,扩展,调试和维护.

Style point: note that this gets rid of the schedule_task() method. Parallel processing can be difficult, and over the decades I've found it extremely valuable to keep inter-thread/inter-process communication as simple and dead obvious as possible. That means, among other things, making use of message queues directly instead of, e.g., hiding them in methods. Layers of abstraction in this context often makes correct code harder to create, extend, debug and maintain.

这篇关于由于阻塞Queue.get()方法而导致死锁的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆