1. 什么需要队列? #
1.1 什么是多线程? #
程序默认是单线程的:代码一行一行执行,前一行没执行完,后一行就得等着。多线程则是让程序同时做多件事,比如一个线程负责下载文件,另一个线程负责显示进度条。
1.2 为什么需要队列? #
当多个线程要共享数据时,如果直接读写同一个变量,很容易出现数据错乱(比如一个线程在写,另一个线程在读,读到一半的数据)。队列就像一个安全的"传送带":一个线程往里面放数据,另一个线程从里面取数据,queue 模块会保证这个过程是线程安全的,不会出错。
通俗比喻:队列就像超市的收银台排队通道,顾客(数据)从一端进入,从另一端被处理,先来的先被服务(FIFO:先进先出)。
2. 什么是 queue.Queue? #
queue.Queue 是 Python 标准库提供的线程安全队列,特点是:
- FIFO:先进先出,先放入的元素会先被取出
- 线程安全:多个线程同时操作不会出错
- 可阻塞:队列空时取数据会等待,队列满时放数据会等待(可选)
下面我们从导入和创建开始,一步步学习。
3. 导入和创建队列 #
3.1 导入 #
使用前需要从 queue 模块导入 Queue 类。
# 从 queue 模块导入 Queue 类
from queue import Queue3.2 创建队列 #
创建队列时可以指定最大容量。maxsize=0 表示不限制大小(默认值)。
# 导入 Queue 类
from queue import Queue
# 创建一个无限大小的队列(maxsize 默认为 0,表示不限制)
q = Queue()
# 创建一个最多能放 10 个元素的队列,超过会阻塞
q_limited = Queue(maxsize=10)参数说明:
maxsize:队列最大容量。设为0或负数表示不限制,默认为0- 当队列已满时,
put()会阻塞等待;当队列为空时,get()会阻塞等待
4. 核心方法详解 #
| 方法 | 作用 |
|---|---|
put(item) |
往队列里放一个元素 |
get() |
从队列里取出一个元素(并移除) |
qsize() |
返回队列中大约有多少个元素(多线程下可能不精确) |
empty() |
队列是否为空 |
full() |
队列是否已满 |
task_done() |
标记一个任务已完成(配合 join 使用) |
join() |
阻塞直到队列中所有任务都被处理完 |
提示:task_done() 和 join() 用于"等所有任务做完再继续",初学时可先掌握 put 和 get。
5. 基础示例 #
5.1 单线程下的简单使用 #
先不用多线程,只看队列的"放入"和"取出"怎么用。
# 导入 Queue 类
from queue import Queue
# 创建一个最多放 3 个元素的队列
q = Queue(maxsize=3)
# 放入第一个元素
q.put("a")
# 放入第二个元素
q.put("b")
# 放入第三个元素
q.put("c")
# 查看当前队列中元素个数(约等于 3)
print(q.qsize())
# 查看队列是否已满(已放满 3 个)
print(q.full())
# 取出第一个元素(先进先出,得到 "a")
print(q.get())
# 取出第二个元素(得到 "b")
print(q.get())
# 取出第三个元素(得到 "c")
print(q.get())
# 查看队列是否为空(已全部取出)
print(q.empty())5.2 生产者-消费者模型(多线程) #
这是队列最典型的用法:一个线程负责"生产"数据放入队列,另一个线程负责"消费"数据并处理。
# 导入多线程模块
import threading
# 导入时间模块,用于模拟耗时操作
import time
# 导入队列
from queue import Queue
# 定义生产者函数,参数 q 是共享的队列
def producer(q):
# 循环生产 5 个产品
for i in range(5):
# 生成产品名称
item = f"产品-{i}"
# 将产品放入队列
q.put(item)
# 打印生产信息
print(f"生产者生产了 {item}")
# 模拟生产耗时 1 秒
time.sleep(1)
# 放入 None 作为结束信号,告诉消费者可以退出了
q.put(None)
# 定义消费者函数,参数 q 是共享的队列
def consumer(q):
# 无限循环,直到收到结束信号
while True:
# 从队列取出一个元素(队列空时会阻塞等待)
item = q.get()
# 如果取到 None,说明生产者已结束,退出循环
if item is None:
break
# 打印消费信息
print(f"消费者处理了 {item}")
# 模拟处理耗时 2 秒
time.sleep(2)
# 创建无限大小的队列
q = Queue()
# 创建生产者线程,target 是函数名,args 是传给函数的参数(元组)
t1 = threading.Thread(target=producer, args=(q,))
# 创建消费者线程
t2 = threading.Thread(target=consumer, args=(q,))
# 启动生产者线程
t1.start()
# 启动消费者线程
t2.start()
# 主线程等待生产者线程结束
t1.join()
# 主线程等待消费者线程结束
t2.join()
# 所有线程结束后打印
print("程序结束")说明:None 在这里充当"哨兵",表示没有更多数据了。生产者放完所有数据后放一个 None,消费者取到 None 就知道可以退出了。
5.3 使用 join() 和 task_done() 等待所有任务完成 #
当你需要"等所有任务都处理完再继续"时,可以用 task_done() 和 join()。
# 导入多线程模块
import threading
# 导入时间模块
import time
# 导入队列
from queue import Queue
# 定义工作线程函数,从队列取任务并处理
def worker(q):
# 无限循环取任务
while True:
# 从队列取一个元素
item = q.get()
# 如果取到 None(哨兵),说明没有更多任务了
if item is None:
# 哨兵也要调用 task_done,否则 q.join() 会一直阻塞
q.task_done()
# 退出循环,线程结束
break
# 打印正在处理的任务
print(f"处理 {item}")
# 模拟处理耗时 1 秒
time.sleep(1)
# 标记这个任务已处理完成(必须调用,否则 join 无法正确返回)
q.task_done()
# 创建队列
q = Queue()
# 用于保存线程对象的列表
threads = []
# 启动 2 个工作线程
for i in range(2):
# 创建线程,传入 worker 函数和队列 q
t = threading.Thread(target=worker, args=(q,))
# 启动线程
t.start()
# 把线程对象加入列表,方便后面 join
threads.append(t)
# 往队列放入 5 个任务
for i in range(5):
q.put(f"任务-{i}")
# 放入 2 个哨兵,每个工作线程取到一个后就会退出
for _ in range(2):
q.put(None)
# 阻塞等待:直到队列中每个 put 的元素都被 get 且调用了 task_done
q.join()
# 此时所有任务都已处理完毕
print("所有任务已完成")
# 等待 2 个工作线程都结束
for t in threads:
t.join()
# 程序结束
print("程序结束")注意:join() 会一直阻塞,直到每个 put 进去的元素都被 get 并且对应调用了 task_done()。所以哨兵 None 也要调用 task_done(),否则 join 永远不会返回。
6. 其他队列类型 #
queue 模块还提供了两种队列,用法和 Queue 类似。
6.1 LifoQueue(后进先出,类似栈) #
# 导入 LifoQueue(Last In First Out,后进先出)
from queue import LifoQueue
# 创建后进先出队列,用法与 Queue 类似
lq = LifoQueue()
# 放入第一个元素
lq.put("first")
# 放入第二个元素
lq.put("second")
# 放入第三个元素
lq.put("third")
# 取出时是最后放入的先被取出(third)
print(lq.get())
# 再取出 second
print(lq.get())
# 最后取出 first
print(lq.get())6.2 PriorityQueue(优先级队列) #
元素需要是 (优先级, 数据) 的元组,数字越小优先级越高。
# 导入 PriorityQueue(优先级队列)
from queue import PriorityQueue
# 创建优先级队列
pq = PriorityQueue()
# 放入元素:元组格式为 (优先级数字, 数据),数字越小越先被取出
# 优先级 2 表示中等
pq.put((2, "中等优先级"))
# 优先级 1 表示最高(最先被取出)
pq.put((1, "高优先级"))
# 优先级 3 表示最低
pq.put((3, "低优先级"))
# 按优先级取出,get() 返回 (优先级, 数据),[1] 取数据部分
print(pq.get()[1])
# 再取出中等优先级的
print(pq.get()[1])
# 最后取出低优先级的
print(pq.get()[1])7. 异常处理 #
当队列空时用非阻塞方式 get,或队列满时用非阻塞方式 put,会抛出异常。
# 导入 Queue 以及 Empty、Full 异常类
from queue import Queue, Empty, Full
# 创建容量为 1 的队列(只能放 1 个元素)
q = Queue(maxsize=1)
# 放入一个元素,block=False 表示不等待,满了就抛异常
q.put("item", block=False)
# 尝试再放一个,此时队列已满
try:
# block=False 会立即抛出 Full 异常
q.put("another", block=False)
except Full:
# 捕获 Full 异常并打印提示
print("队列已满,无法放入")
# 取出刚放入的元素,block=False 表示不等待
q.get(block=False)
# 此时队列已空,尝试再取
try:
# timeout=1 表示最多等 1 秒,超时则抛出 Empty
q.get(timeout=1)
except Empty:
# 捕获 Empty 异常并打印提示
print("队列为空,获取超时")说明:block=False 表示不等待,立刻返回;timeout=1 表示最多等 1 秒。新手阶段用默认的阻塞方式即可,很少需要处理这些异常。
8. 注意事项 #
- 线程安全:
queue内部已经处理好锁,多线程下可以放心使用。 - qsize()、empty()、full() 不精确:多线程环境下,这些值可能在返回后马上被其他线程改变,只适合做大致判断,不要用来做精确逻辑。
- 经典用法:生产者
put,消费者get后处理并调用task_done(),主线程用q.join()等待全部完成。
9. 总结 #
| 内容 | 要点 |
|---|---|
| Queue | 线程安全的 FIFO 队列,多线程间传递数据 |
| 基本用法 | put() 放入,get() 取出 |
| 典型场景 | 生产者-消费者模型 |
| 进阶用法 | task_done() + join() 等待所有任务完成 |
| 其他类型 | LifoQueue(栈)、PriorityQueue(优先级队列) |
掌握 Queue 的 put、get 以及 task_done、join 的配合,就能应对大部分多线程任务队列场景。建议先多练单线程示例,再尝试生产者-消费者模型。