前言
Python中的Queue实现了一个同步队列,并且该类实现了所有需要的锁原语。Queue实现了三种队列:普通的FIFO队列(Queue)、LIFO队列(LifoQueue)、优先级队列(PriorityQueue)。其使用方法类似,下面以普通的先进先出队列Queue为例谈一下Queue中的主要方法
Queue中的方法及使用
使用Queue1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30#引入Queue类
from Queue import Queue
#得到队列的大小
Queue.qsize()
#判断队列是否为空
Queue.empty()
#判断队列是否已满
Queue.full()
#从队列头获取元素,默认为阻塞
Queue.get([block[,timeout]])
#从队列头获取元素,非阻塞方式
Queue.get_nowait()
#或者
Queue.get(block=False)
#阻塞写入队列
Queue.put(item)
#非阻塞写入队列
Queue.put_nowait(item)
#或者
Queue.put(item,block=False)
#向队列中已完成的元素发送join信号
Queue.task_done()
上面从队列中获取元素和向队列中添加元素都有阻塞和非阻塞的方式,采用阻塞方式,如果从队列中取元素而元素为空,则线程会停下来等待知道队列中有元素可以取出;如果向队列中添加元素而此时队列已满,则同样线程会停下来直到停止。如果采用非阻塞方式,取元素时一旦队列为空,则会引发Empty异常,放元素时一旦队列已满,就会引发Full异常。
下面是采用Queue实现的经典生产者消费者问题的代码,来源:Python模块之Queue1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42from Queue import Queue
import random
import threading
import time
#Producer thread
class Producer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data=queue
def run(self):
for i in range(5):
print "%s: %s is producing %d to the queue!" %(time.ctime(), self.getName(), i)
self.data.put(i)
time.sleep(random.randrange(10)/5)
print "%s: %s finished!" %(time.ctime(), self.getName())
#Consumer thread
class Consumer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print "%s: %s is consuming. %d in the queue is consumed!" %(time.ctime(), self.getName(), val)
time.sleep(random.randrange(10))
print "%s: %s finished!" %(time.ctime(), self.getName())
#Main thread
def main():
queue = Queue()
producer = Producer('Pro.', queue)
consumer = Consumer('Con.', queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print 'All threads terminate!'
if __name__ == '__main__':
main()
最后附上Queue模块的源码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240"""A multi-producer, multi-consumer queue."""
from time import time as _time
try:
import threading as _threading # 导入threading模块
except ImportError:
import dummy_threading as _threading # 该模块的接口和thread相同,在没有实现thread模块的平台上提供thread模块的功能。
from collections import deque # https://github.com/BeginMan/pythonStdlib/blob/master/collections.md
import heapq # 堆排序 https://github.com/qiwsir/algorithm/blob/master/heapq.md
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] # 模块级别暴露接口
class Empty(Exception):
"""当调用Queue.get(block=0)/get_nowait()时触发Empty异常
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。
如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。
如果队列为空且block为False,队列将引发Empty异常
"""
pass
class Full(Exception):
"""当调用Queue.put(block=0)/put_nowait()时触发Full异常
如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。
如果block为0,put方法将引发Full异常。
"""
pass
class Queue:
"""创建一个给定的最大大小的队列对象.
FIFO(先进先出)队列, 第一加入队列的任务, 被第一个取出
If maxsize is <= 0, the queue size is 无限大小.
"""
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize) # 初始化queue为空
# 所有获取锁的方法必须在返回之前先释放,互斥锁在下面三个Condition条件共享
# 从而获取和释放的条件下也获得和释放互斥锁。
self.mutex = _threading.Lock() # Lock锁
# 当添加queue元素时通知`not_empty`,之后线程等待get
self.not_empty = _threading.Condition(self.mutex) # not_empty Condition实例
# 当移除queue元素时通知`not_full`,之后线程等待put.
self.not_full = _threading.Condition(self.mutex) # not_full Condition实例
# 当未完成的任务数为0时,通知`all_tasks_done`,线程等待join()
self.all_tasks_done = _threading.Condition(self.mutex) # all_tasks_done Condition实例
self.unfinished_tasks = 0
def task_done(self):
"""表明,以前排队的任务完成了
被消费者线程使用. 对于每个get(),随后调用task_done()告知queue这个task已经完成
"""
self.all_tasks_done.acquire()
try:
# unfinished_tasks 累减
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
# 调用多次task_done则触发异常
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all() # 释放所有等待该条件的线程
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def join(self):
"""阻塞直到所有任务都处理完成
未完成的task会在put()累加,在task_done()累减, 为0时,join()非阻塞.
"""
self.all_tasks_done.acquire()
try:
# 一直循环检查未完成数
while self.unfinished_tasks:
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()
def qsize(self):
"""返回队列的近似大小(不可靠!)"""
self.mutex.acquire()
n = self._qsize() # len(queue)
self.mutex.release()
return n
def empty(self):
"""队列是否为空(不可靠)."""
self.mutex.acquire()
n = not self._qsize()
self.mutex.release()
return n
def full(self):
"""队列是否已满(不可靠!)."""
self.mutex.acquire()
n = 0 < self.maxsize == self._qsize()
self.mutex.release()
return n
def put(self, item, block=True, timeout=None):
"""添加元素.
如果可选参数block为True并且timeout参数为None(默认), 为阻塞型put().
如果timeout是正数, 会阻塞timeout时间并引发Queue.Full异常.
如果block为False为非阻塞put
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
def put_nowait(self, item):
"""
非阻塞put
其实就是将put第二个参数block设为False
"""
return self.put(item, False)
def get(self, block=True, timeout=None):
"""移除列队元素并将元素返回.
block = True为阻塞函数, block = False为非阻塞函数. 可能返回Queue.Empty异常
"""
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
def get_nowait(self):
"""
非阻塞get()
也即是get()第二个参数为False
"""
return self.get(False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# 初始化队列表示
def _init(self, maxsize):
self.queue = deque() # 将queue初始化为一个空的deque对象
def _qsize(self, len=len): # 队列长度
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
class PriorityQueue(Queue):
"""
继承Queue
构造一个优先级队列
maxsize设置队列大小的上界, 如果插入数据时, 达到上界会发生阻塞, 直到队列可以放入数据.
当maxsize小于或者等于0, 表示不限制队列的大小(默认).
优先级队列中, 最小值被最先取出
"""
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
"""
构造一LIFO(先进后出)队列
maxsize设置队列大小的上界, 如果插入数据时, 达到上界会发生阻塞, 直到队列可以放入数据.
当maxsize小于或者等于0, 表示不限制队列的大小(默认)
"""
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop() # 与Queue相比,仅仅是 将popleft()改成了pop()