关于使用Queue()/deque()和类变量进行通信和“毒丸"的处理与线程的关系. [英] Process vs. Thread with regards to using Queue()/deque() and class variable for communication and "poison pill"

查看:88
本文介绍了关于使用Queue()/deque()和类变量进行通信和“毒丸"的处理与线程的关系.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个在While True循环中永久运行的线程或进程.

I would like to create either a Thread or a Process which runs forever in a While True loop.

我需要以队列(multiprocessing.Queue()或collections.deque())的形式向工作人员发送和接收数据.我更喜欢使用collections.deque(),因为它明显更快.

I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.

我还必须最终能够杀死该工作程序(因为它在True循环中运行.这是一些测试代码,我将它们组合在一起以尝试理解Threads,Processes,Queues和deque之间的区别.

I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..

import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque

class ThreadingTest(Thread):

    def __init__(self, q):
        super(ThreadingTest, self).__init__()
        self.q = q
        self.toRun = False

    def run(self):
        print("Started Thread")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Thread deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Thread Queue: " + str(i))

    def stop(self):
        print("Trying to stop Thread")
        self.toRun = False
        while self.isAlive():
            time.sleep(0.1)
        print("Stopped Thread")

class ProcessTest(Process):

    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.toRun = False
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Process deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Process Queue: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.toRun = False
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    q = Queue()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        if type(q) == type(deque()):
            q.append(i)
        elif type(q) == type(Queue()):
            q.put_nowait(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    if type(q) == type(deque()):
        print(q)
    elif type(q) == type(Queue()):
        while q.qsize() > 0:
            print(str(q.get_nowait()))

如您所见,t1可以是ThreadingTest或ProcessTest.另外,传递给它的队列可以是multiprocessing.Queue或collections.deque.

As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.

ThreadingTest与Queue或deque()一起使用.当调用stop()方法时,它也可以正确杀死run().

ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.

Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])

ProcessTest仅在类型为multiprocessing.Queue的情况下才能够从队列中读取.它不适用于collections.deque.此外,我无法使用stop()终止进程.

ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().

Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process

我试图弄清楚为什么?另外,在进程中使用双端队列的最佳方法是什么?而且,我将如何使用某种stop()方法来终止进程.

I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.

推荐答案

您不能使用collections.deque在两个multiprocessing.Process实例之间传递数据,因为collections.deque不能识别进程. multiprocessing.Queue内部将其内容写入multiprocessing.Pipe,这意味着其中的数据可以在一次处理中排队,而在另一个处理中检索. collections.deque没有这种类型的管道,因此它将不起作用.当您在一个进程中写入deque时,在另一个进程中的deque实例将完全不会受到影响;它们是完全独立的实例.

You can't use a collections.deque to pass data between two multiprocessing.Process instances, because collections.deque is not process-aware. multiprocessing.Queue writes its contents to a multiprocessing.Pipe internally, which means that data in it can be enqueued in once process and retrieved in another. collections.deque doesn't have that kind of plumbing, so it won't work. When you write to the deque in one process, the deque instance in the other process won't be affected at all; they're completely separate instances.

您的stop()方法也发生了类似的问题.您要在主过程中更改toRun的值,但这完全不会影响孩子.它们是完全独立的实例.结束孩子的最好方法是向Queue发送一些标记.当您将哨兵放在孩子身上时,请跳出无限循环:

A similar issue is happening to your stop() method. You're changing the value of toRun in the main process, but this won't affect the child at all. They're completely separate instances. The best way to end the child would be to send some sentinel to the Queue. When you get the sentinel in the child, break out of the infinite loop:

def run(self):
    print("Started Process")
    self.toRun = True
    while self.toRun:
        if type(self.q) == type(deque()):
            if self.q:
                i = self.q.popleft()
                print("Process deque: " + str(i))
        elif type(self.q) == type(Queue()):
            if not self.q.empty():
                i = self.q.get_nowait()
                if i is None:  
                    break  # Got sentinel, so break
                print("Process Queue: " + str(i))

def stop(self):
    print("Trying to stop Process")
    self.q.put(None)  # Send sentinel
    while self.is_alive():
        time.sleep(0.1)
    print("Stopped Process")

如果您确实确实需要在两个过程之间使用deque语义,则可以使用

If you actually do need deque semantics between two process, you can use a custom multiprocessing.Manager() to create a shared deque in a Manager process, and each of your Process instances will get a Proxy to it:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque

SyncManager.register('deque', deque)

def Manager():
    m = SyncManager()
    m.start()
    return m

class ProcessTest(Process):
    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if self.q._getvalue():
                i = self.q.popleft()
                if i is None:
                    break
                print("Process deque: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.q.append(None)
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    m = Manager()
    q = m.deque()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        q.append(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    print(q)

请注意,这可能不会比multiprocessing.Queue快,因为每次访问deque时都会产生IPC费用.这也是一种以您自己的方式传递消息的自然数据结构.

Note that this probably isn't going to be faster than a multiprocessing.Queue, though, since there's an IPC cost for every time you access the deque. It's also a much less natural data structure for passing messages the way you are.

这篇关于关于使用Queue()/deque()和类变量进行通信和“毒丸"的处理与线程的关系.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆