在现实生活中,如果一个团队正在开展一项共同任务,那么他们之间应该进行沟通,以便正确完成任务.同样的类比也适用于线程.在编程中,为了减少处理器的理想时间,我们创建多个线程并为每个线程分配不同的子任务.因此,必须有一个通信设施,他们应该相互交互,以同步的方式完成工作.
考虑以下与线程互通有关的要点和减号;
无绩效收益 : 如果我们无法在线程和进程之间实现正确的通信,那么并发性和并行性带来的性能提升是没有用的.
正确完成任务 : 如果线程之间没有适当的相互通信机制,则无法正确完成分配的任务.
比进程间通信更有效 : 进程间通信比进程间通信更高效,更易于使用,因为进程中的所有线程共享相同的地址空间,并且不需要使用共享内存.
多线程代码会出现将信息从一个线程传递到另一个线程的问题.标准通信原语无法解决此问题.因此,我们需要实现自己的复合对象,以便在线程之间共享对象,以使通信成为线程安全的.以下是一些数据结构,它们在对它们进行一些更改后提供线程安全通信;<
用于使用set数据结构一个线程安全的方式,我们需要扩展set类来实现我们自己的锁定机制.
这是一个扩展的Python示例class :
class extend_class(set): def __init__(self, *args, **kwargs): self._lock = Lock() super(extend_class, self).__init__(*args, **kwargs) def add(self, elem): self._lock.acquire() try: super(extend_class, self).add(elem) finally: self._lock.release() def delete(self, elem): self._lock.acquire() try: super(extend_class, self).delete(elem) finally: self._lock.release()
在上面的例子中,一个clas已定义名为 extend_class 的对象,该对象继承自Python set class .在此类的构造函数中创建一个锁对象.现在,有两个函数 - add()和 delete().这些函数是定义的并且是线程安全的.它们都依赖于超级类功能,只有一个关键的例外.
这是线程的另一个关键方法-safe通信是装饰器的使用.
考虑一个Python示例,演示如何使用装饰器&mminus;
def lock_decorator(method): def new_deco_method(self, *args, **kwargs): with self._lock: return method(self, *args, **kwargs) return new_deco_method class Decorator_class(set): def __init__(self, *args, **kwargs): self._lock = Lock() super(Decorator_class, self).__init__(*args, **kwargs) @lock_decorator def add(self, *args, **kwargs): return super(Decorator_class, self).add(elem) @lock_decorator def delete(self, *args, **kwargs): return super(Decorator_class, self).delete(elem)
在上面的示例中,定义了一个名为lock_decorator的装饰器方法,该方法继承自Python方法类.然后在此类的构造函数中创建一个锁对象.现在,有两个函数 - add()和delete().这些函数是定义的并且是线程安全的.它们都依赖于超类功能,只有一个关键的例外.
列表数据结构是线程安全的,快速的以及简单的结构用于临时的内存存储.在Cpython中,GIL可以防止对它们进行并发访问.我们开始知道列表是线程安全的,但是它们中的数据呢.实际上,列表的数据不受保护.例如, L.append(x)不保证在另一个线程尝试执行相同操作时返回预期结果.这是因为,虽然 append()是一个原子操作并且是线程安全的,但另一个线程试图以并发方式修改列表的数据,因此我们可以看到竞争条件对输出的副作用.
要解决此类问题并安全地修改数据,我们必须实施适当的锁定机制,这进一步确保多个线程不会潜在地遇到竞争条件.为了实现正确的锁定机制,我们可以像前面的例子中那样扩展类.
列表上的一些其他原子操作如下 :
L.append(x) L1.extend(L2) x = L[i] x = L.pop() L1[i:j] = L2 L.sort() x = y x.field = y D[x] = y D1.update(D2) D.keys()
这里 :
L,L1,L2都是列表
D,D1,D2是dicts
x,y是对象
i,j是整数
如果列表的数据是没有受到保护,我们可能不得不面对后果.我们可能会获得或删除有关竞争条件的错误数据项.这就是为什么建议使用队列数据结构.一个真实的队列示例可以是单车道单向道路,车辆首先进入,首先退出.在票务窗口和公共汽车站的队列中可以看到更多真实的例子.
队列默认情况下是线程安全的数据结构,我们不必担心实现复杂的锁定机制. Python为我们提供了
在本节中,我们将获得关于不同类型的队列. Python提供了三个队列选项,可以从< queue> 模块中使用 :
正常队列(先进优先出先日会)
LIFO,后进先出
优先级
我们将在后续章节中了解不同的队列.
它是Python提供的最常用的队列实现.在这种排队机制中,无论谁先来,都会先获得服务. FIFO也称为普通队列. FIFO队列可以表示如下 :
在python中,FIFO队列可以用单线程和多线程实现.
为了实现单线程的FIFO队列,队列类将实现一个基本的先进先出容器.元素将使用 put()添加到序列的一个"结尾",并使用 get()从另一端删除.
以下是用于实现具有单线程和减号的FIFO队列的Python程序;
import queue q = queue.Queue() for i in range(8): q.put("item-" + str(i)) while not q.empty(): print (q.get(), end = " ")
item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7
输出显示上面的程序使用单个线程来说明元素是按照它们插入的顺序从队列中删除的.
为了实现具有多个线程的FIFO,我们需要定义myqueue()函数,该函数从队列模块扩展.在使用单线程实现FIFO队列时,get()和put()方法的工作与上面讨论的相同.然后为了使它成为多线程,我们需要声明并实例化线程.这些线程将以FIFO方式使用队列.
以下是用于实现具有多个线程的FIFO队列的Python程序
import threading import queue import random import time def myqueue(queue): while not queue.empty(): item = queue.get() if item is None: break print("{} removed {} from the queue".format(threading.current_thread(), item)) queue.task_done() time.sleep(2) q = queue.Queue() for i in range(5): q.put(i) threads = [] for i in range(4): thread = threading.Thread(target=myqueue, args=(q,)) thread.start() threads.append(thread) for thread in threads: thread.join()
<Thread(Thread-3654, started 5044)> removed 0 from the queue <Thread(Thread-3655, started 3144)> removed 1 from the queue <Thread(Thread-3656, started 6996)> removed 2 from the queue <Thread(Thread-3657, started 2672)> removed 3 from the queue <Thread(Thread-3654, started 5044)> removed 4 from the queue
此队列使用与FIFO完全相反的类比(排在第一位的队列.在这个排队机制中,最后一个,将首先获得服务.这类似于实现堆栈数据结构. LIFO队列在实现像人工智能算法这样的深度优先搜索时非常有用.
在python中,LIFO队列可以是用单线程和多线程实现.
为了用单线程实现LIFO队列,队列 class将使用结构 Queue .LifoQueue实现一个基本的后进先出容器.现在,在调用 put()时,元素将添加到容器的头部,并在使用 get()时从头部移除.
以下是用于实现LIFO队列的Python程序,单线程 :
import queue q = queue.LifoQueue() for i in range(8): q.put("item-" + str(i)) while not q.empty(): print (q.get(), end=" ") Output: item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0
输出显示上述程序使用单个线程来说明元素是按照它们插入的相反顺序从队列中删除的.
实现与我们类似已经完成了多线程FIFO队列的实现.唯一的区别是我们需要使用 Queue 类,它将使用结构 Queue.LifoQueue 实现基本的后进先出容器.
以下是用于实现具有多个线程的LIFO队列的Python程序 :
import threading import queue import random import time def myqueue(queue): while not queue.empty(): item = queue.get() if item is None: break print("{} removed {} from the queue".format(threading.current_thread(), item)) queue.task_done() time.sleep(2) q = queue.LifoQueue() for i in range(5): q.put(i) threads = [] for i in range(4): thread = threading.Thread(target=myqueue, args=(q,)) thread.start() threads.append(thread) for thread in threads: thread.join()
<Thread(Thread-3882, started 4928)> removed 4 from the queue <Thread(Thread-3883, started 4364)> removed 3 from the queue <Thread(Thread-3884, started 6908)> removed 2 from the queue <Thread(Thread-3885, started 3584)> removed 1 from the queue <Thread(Thread-3882, started 4928)> removed 0 from the queue
在FIFO和LIFO队列中,项目的顺序与插入顺序.但是,在许多情况下,优先级比插入顺序更重要.让我们考虑一个现实世界的例子.假设机场的安检正在检查不同类别的人. VVIP的人员,航空公司的工作人员,海关人员,类别可以优先进行检查,而不是像对待普通人那样在到达的基础上进行检查.
需要的另一个重要方面被认为是优先级队列是如何开发任务调度程序的.一种常见的设计是在队列中优先处理大多数代理任务.此数据结构可用于根据队列的优先级值从队列中获取项目.
在python中,优先级队列可以用单线程和多线程实现.
为了实现单线程的优先级队列, Queue 类将使用结构 Queue .PriorityQueue在优先级容器上实现任务.现在,在调用 put()时,元素将添加一个值,其中最低值将具有最高优先级,因此首先使用 get()检索.
考虑使用以下Python程序实现具有单线程和减号的优先级队列;
import queue as Q p_queue = Q.PriorityQueue() p_queue.put((2, 'Urgent')) p_queue.put((1, 'Most Urgent')) p_queue.put((10, 'Nothing important')) prio_queue.put((5, 'Important')) while not p_queue.empty(): item = p_queue.get() print('%s - %s' % item)
1 – Most Urgent 2 - Urgent 5 - Important 10 – Nothing important
在上面的输出中,我们可以看到队列已根据优先级存储了项目 - 较少的值具有高优先级.
该实现类似于具有多个线程的FIFO和LIFO队列的实现.唯一的区别是我们需要使用 Queue 类来使用结构 Queue.PriorityQueue 来初始化优先级.另一个区别在于生成队列的方式.在下面给出的示例中,它将使用两个相同的数据集生成.
以下Python程序有助于优先级队列的实现多线程 :
import threading import queue import random import time def myqueue(queue): while not queue.empty(): item = queue.get() if item is None: break print("{} removed {} from the queue".format(threading.current_thread(), item)) queue.task_done() time.sleep(1) q = queue.PriorityQueue() for i in range(5): q.put(i,1) for i in range(5): q.put(i,1) threads = [] for i in range(2): thread = threading.Thread(target=myqueue, args=(q,)) thread.start() threads.append(thread) for thread in threads: thread.join()
<Thread(Thread-4939, started 2420)> removed 0 from the queue <Thread(Thread-4940, started 3284)> removed 0 from the queue <Thread(Thread-4939, started 2420)> removed 1 from the queue <Thread(Thread-4940, started 3284)> removed 1 from the queue <Thread(Thread-4939, started 2420)> removed 2 from the queue <Thread(Thread-4940, started 3284)> removed 2 from the queue <Thread(Thread-4939, started 2420)> removed 3 from the queue <Thread(Thread-4940, started 3284)> removed 3 from the queue <Thread(Thread-4939, started 2420)> removed 4 from the queue <Thread(Thread-4940, started 3284)> removed 4 from the queue