带队列的Python线程:如何避免使用Join? [英] Python threading with queue: how to avoid to use join?

查看:123
本文介绍了带队列的Python线程:如何避免使用Join?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有2个线程的方案:

I have a scenario with 2 threads:

  1. 一个线程等待套接字中的消息(嵌入在C库中-阻止调用为"Barra.ricevi"),然后将元素放入队列中

  1. a thread waiting for messages from a socket (embedded in a C library - blocking call is "Barra.ricevi") then putting an element on a queue

一个等待从队列中获取元素并执行操作的线程

a thread waiting to get element from the queue and do something

示例代码

import Barra
import Queue    
import threading

posQu = Queue.Queue(maxsize=0)

def threadCAN():
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print (canMsg)
        else:
            print ("Enqueued message"), canMsg
            posQu.put(canMsg)

thCan = threading.Thread(target = threadCAN)
thCan.daemon = True
thCan.start()

while True:
    posMsg = posQu.get()
    print ("Messagge from the queue"), posMsg

结果是每次从套接字收到新消息时,都会向队列中添加新元素,但是从队列中获取项目的主线程却永远不会醒来.

The result is that every time a new message is coming from the socket a new element is added to the queue, BUT the main thread that should get items from the queue is never woke up.

输出如下:

排队的消息

Enqueued message

排队的消息

排队的消息

排队的消息

我希望有:

排队的消息

Enqueued message

队列中的消息

排队的消息

队列中的消息

解决此问题的唯一方法是添加行:

The only way to solve this issue seams to add the line:

posQu.join()

在线程末尾,等待来自套接字的消息,并显示以下行:

at the end of the thread waiting for messages from the socket, and the line:

posQu.task_done()

在主线程的末尾.

在这种情况下,从套接字接收到新消息后,线程将阻塞等待主线程处理排队的项目.

In this case, after that a new message has been received from the socket, the thread is blocking waiting for the main thread to process the enqueued item.

不幸的是,这不是理想的行为,因为我希望线程始终准备从套接字获取消息,而不是等待作业从另一个线程完成.

Unfortunately this isn't the desired behavior since I would like a thread always ready to get messages from a socket and not waiting for a job to be compleated from another thread.

我做错了什么? 谢谢

安德鲁 (意大利)

Andrew (Italy)

推荐答案

这可能是因为Barra.ricevi时,您的Barra不会释放全局解释器锁(GIL).您可能还是要检查一下.

This is likely because your Barra does not release the global interpreter lock (GIL) when Barra.ricevi. You may want to check this though.

GIL确保在任何一次只能运行一个线程(限制了多处理器系统中线程的有用性). GIL每100个滴答"就会切换线程-滴答会松散地映射到字节码指令.有关更多详细信息,请参见此处.

The GIL ensures that only one thread can run at any one time (limiting the usefulness of threads in a multi-processor system). The GIL switches threads every 100 "ticks" -- a tick loosely mapping to bytecode instructions. See here for more details.

在您的生产者线程中,C库调用之外没有发生太多事情.这意味着生产者线程将在GIL切换到另一个线程之前多次调用Barra.ricevi.

In your producer thread, not much happens outside of the C-library call. This means the producer thread will get to call Barra.ricevi a great many times before the GIL switches to another thread.

针对此问题的解决方案是从日益复杂的角度出发:

Solutions to this are to, in terms of increasing complexity:

  • 将项目添加到队列后,调用time.sleep(0).这样就产生了一个线程,以便另一个线程可以运行.
  • 使用sys.setcheckinterval()减少切换线程之前执行的滴答声"的数量.这将使程序的计算量大大增加.
  • 使用multiprocessing而不是threading.这包括使用multiprocessing.Queue代替Queue.Queue.
  • 修改Barra,以便在调用其功能时释放GIL.
  • Call time.sleep(0) after adding an item to the queue. This yields the thread so that another thread can run.
  • Use sys.setcheckinterval() to lower the amount of "ticks" executed before switching threads. This is will come at the cost of making the program much more computationally expensive.
  • Use multiprocessing rather than threading. This includes using multiprocessing.Queue instead of Queue.Queue.
  • Modify Barra so that it does release the GIL when its functions are called.

使用multiprocessing的示例.请注意,使用多处理时,您的进程不再具有隐式共享状态.您将需要查看多处理,以了解如何在进程之间传递信息.

Example using multiprocessing. Be aware that when using multiprocessing, your processes no longer have an implied shared state. You will need to have a look at multiprocessing to see how to pass information between processes.

import Barra  
import multiprocessing

def threadCAN(posQu):
    while True:
        canMsg = Barra.ricevi("can0")
        if canMsg[0] == 'ERR':
            print(canMsg)
        else:
            print("Enqueued message", canMsg)
            posQu.put(canMsg)

if __name__ == "__main__":
    posQu = multiprocessing.Queue(maxsize=0)
    procCan = multiprocessing.Process(target=threadCAN, args=(posQu,))
    procCan.daemon = True
    procCan.start()

    while True:
        posMsg = posQu.get()
        print("Messagge from the queue", posMsg)

这篇关于带队列的Python线程:如何避免使用Join?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆