互斥线程锁定,在Python中互斥锁/锁定释放后会删除排队的函数? [英] Mutual exclusion thread locking, with dropping of queued functions upon mutex/lock release, in Python?

查看:84
本文介绍了互斥线程锁定,在Python中互斥锁/锁定释放后会删除排队的函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我遇到的问题:我正在使用Python 2.7,并且我有一个在线程中运行的代码,该代码具有一个关键区域,当时只能执行一个线程.该代码当前没有互斥机制,因此我想查询特定用途的情况,其中涉及删除"排队"功能.我尝试使用以下最少的工作示例来模拟这种行为:

This is the problem I have: I'm using Python 2.7, and I have a code which runs in a thread, which has a critical region that only one thread should execute at the time. That code currently has no mutex mechanisms, so I wanted to inquire what I could use for my specific use case, which involves "dropping" of "queued" functions. I've tried to simulate that behavior with the following minimal working example:

useThreading=False # True

if useThreading:  from threading import Thread, Lock
else:             from multiprocessing import Process, Lock

mymutex = Lock()

import time
tstart = None

def processData(data):
  #~ mymutex.acquire()
  try:
    print('thread {0} [{1:.5f}] Do some stuff'.format(data, time.time()-tstart))
    time.sleep(0.5)
    print('thread {0} [{1:.5f}] 1000'.format(data, time.time()-tstart))
    time.sleep(0.5)
    print('thread {0} [{1:.5f}] done'.format(data, time.time()-tstart))
  finally:
    #~ mymutex.release()
    pass

# main:
tstart = time.time()
for ix in xrange(0,3):
  if useThreading: t = Thread(target = processData, args = (ix,))
  else: t = Process(target = processData, args = (ix,))
  t.start()
  time.sleep(0.001)

现在,如果运行此代码,将得到如下打印输出:

Now, if you run this code, you get a printout like this:

thread 0 [0.00173] Do some stuff
thread 1 [0.00403] Do some stuff
thread 2 [0.00642] Do some stuff
thread 0 [0.50261] 1000
thread 1 [0.50487] 1000
thread 2 [0.50728] 1000
thread 0 [1.00330] done
thread 1 [1.00556] done
thread 2 [1.00793] done

这就是说,三个线程快速地一个接一个地排队"(彼此相隔2-3毫秒左右).实际上,它们不会排在队列中,它们只是在彼此相隔2-3 ms之后才开始并行执行.

That is to say, the three threads quickly get "queued" one after another (something like 2-3 ms after each other). Actually, they don't get queued, they simply start executing in parallel after 2-3 ms after each other.

现在,如果启用mymutex.acquire()/.release()命令,我将得到预期的结果:

Now, if I enable the mymutex.acquire()/.release() commands, I get what would be expected:

thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
thread 1 [1.00350] Do some stuff
thread 1 [1.50462] 1000
thread 1 [2.00531] done
thread 2 [2.00547] Do some stuff
thread 2 [2.50638] 1000
thread 2 [3.00706] done

基本上,现在有了锁定,线程不再并行运行,但是由于有锁,它们又一个又一个地运行-只要一个线程在工作,其他线程就会在.acquire()处阻塞.但这也不是我想要实现的.

Basically, now with locking, the threads don't run in parallel, but they run one after another thanks to the lock - as long as one thread is working, the others will block at the .acquire(). But this is not exactly what I want to achieve, either.

我要实现的是:假设.acquire()最初由线程函数触发时,它在队列中注册了函数的ID(例如指向它的指针).之后,其行为基本上与Lock相同-当一个线程工作时,其他线程在.acquire()处阻塞.当第一个线程完成后,它进入finally:块-在这里,我想检查队列中有多少线程在等待;然后我想删除/删除最后一个线程中的所有等待线程 -最后,我要.release()锁;这意味着在此之后,队列中的最后一个线程将执行下一步.我可以想象,我想写类似以下的伪代码:

What I want to achieve is this: let's assume that when .acquire() is first triggered by a thread function, it registers an id of a function (say a pointer to it) in a queue. After that, the behavior is basically the same as with the Lock - while the one thread works, the others block at .acquire(). When the first thread is done, it goes in the finally: block - and here, I'd like to check to see how many threads are waiting in the queue; then I'd like to delete/drop all waiting threads except for the very last one - and finally, I'd .release() the lock; meaning that after this, what was the last thread in the queue would execute next. I'd imagine, I would want to write something like the following pseudocode:

  ...
  finally:
    if (len(mymutex.queue) > 2): # more than this instance plus one other waiting:
      while (len(mymutex.queue) > 2):
        mymutex.queue.pop(1) # leave alone [0]=this instance, remove next element
    # at this point, there should be only queue[0]=this instance, and queue[1]= what was the last thread queued previously
    mymutex.release() # once we releace, queue[0] should be gone, and the next in the queue should acquire the mutex/lock..
    pass
  ...

有了这个,我希望得到这样的打印输出:

With that, I'd expect a printout like this:

thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
# here upon lock release, thread 1 would be deleted - and the last one in the queue, thread 2, would acquire the lock next:
thread 2 [1.00350] Do some stuff
thread 2 [1.50462] 1000
thread 2 [2.00531] done

在Python中最简单的方法是什么?

What would be the most straightforward way to achieve this in Python?

推荐答案

似乎您想要类似队列的行为,所以为什么不使用

Seems like you want a queue-like behaviour, so why not use Queue?

import threading
from  Queue import Queue
import time

# threads advertise to this queue when they're waiting
wait_queue = Queue()
# threads get their task from this queue
task_queue = Queue()

def do_stuff():
    print "%s doing stuff" % str(threading.current_thread())
    time.sleep(5)
def queue_thread(sleep_time):

    # advertise current thread waiting
    time.sleep(sleep_time)  
    wait_queue.put("waiting")  

    # wait for permission to pass
    message = task_queue.get()

    print "%s got task: %s" % (threading.current_thread(), message)
    # unregister current thread waiting
    wait_queue.get()

    if message == "proceed":
        do_stuff()
        # kill size-1 threads waiting
        for _ in range(wait_queue.qsize() - 1):
            task_queue.put("die")
        # release last
        task_queue.put("proceed")

    if message == "die":
        print "%s died without doing stuff" % threading.current_thread()
        pass

t1 = threading.Thread(target=queue_thread, args=(1, ))
t2 = threading.Thread(target=queue_thread, args=(2, ))
t3 = threading.Thread(target=queue_thread, args=(3, ))
t4 = threading.Thread(target=queue_thread, args=(4, ))

# allow first thread to pass
task_queue.put("proceed")

t1.start()
t2.start()
t3.start()
t4.start()

线程-1首先到达并获取"该部分,其他线程随后到达以在队列中等待(并通告它们正在等待).然后,当线程1离开时,它通过告诉所有其他线程死亡以及最后一个线程继续进行,来授予队列中的最后一个线程许可.

thread-1 arrives first and "acquires" the section, other threads come later to wait at the queue (and advertise they're waiting). Then, when thread-1 leaves it gives permission to the last thread at the queue by telling all other thread to die, and the last thread to proceed.

您可以使用不同的消息进行更好的控制,典型的消息是wait_queue中的线程ID(因此您知道正在等待,以及到达的顺序).

You can have finer control using different messages, a typical one would be a thread-id in the wait_queue (so you know who is waiting, and the order in which it arrived).

您可能可以利用非阻塞操作( queue.put(block=False) queue.get(block=False) )当您根据自己的需要着急时就喜欢.

You can probably utilize non-blocking operations (queue.put(block=False) and queue.get(block=False)) in your favour when you're set on what you need.

这篇关于互斥线程锁定,在Python中互斥锁/锁定释放后会删除排队的函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆