在 Python 中,每个进程都拥有独立的私有内存空间,彼此严格隔离——这一设计虽保障了安全性,却也给跨进程通信带来了挑战。如何让不同进程之间高效地传递消息?multiprocessing 模块提供了两种标准方案:队列(Queue)和管道(Pipe)。实际上,队列的底层实现就是在管道的基础上添加了一把锁,从而保证线程安全。本文将从队列入手,系统介绍进程间通信的常见方式,进而延伸到并发编程中一个极为经典的模式——生产者消费者模型。

队列的常用方法
队列严格遵循先进先出(FIFO)原则,就像日常排队购票一样。下面通过基础用法来熟悉它:
from multiprocessing import Process,Queue q = Queue([maxsize]) # 创建一个队列,maxsize 指定队列能存放多少条数据。如果省略,大小取决于内存空间 q.put(data, block=True, timeout=3) # 往队列里存入 data # 若 block=True(默认)且 timeout 为正数,该方法会阻塞最多 timeout 秒,等待队列有空闲空间 # 超时则抛出 Queue.Full 异常;若 block=False 且队列已满,立即抛出 Queue.Full q.get(block=True, timeout=3) # 从队列取数据,参数含义与 put 相同 q.get_nowait(): # 等价于 q.get(block=False) q.put_nowait(): # 等价于 q.put(block=False) q.empty() # 判断队列是否为空 q.full() # 判断队列是否已满 q.qsize() # 返回队列中当前数据的个数 # 注意:由于多进程并发执行,随时可能向队列存取数据,这些状态信息并不可靠 q.cancel_join_thread() # 不让进程退出时自动连接后台线程,可避免 join_thread() 阻塞 q.close() # 关闭队列,阻止更多数据加入。后台线程会继续处理已入列但尚未写入的数据,完成后立即关闭 # 如果队列被垃圾回收,会自动调用此方法。关闭队列不会在消费者端产生数据结束信号或异常 q.join_thread() # 连接后台线程,等待所有队列项被消耗。通常不是原始创建者的进程会自动调用
生产者消费者模型
在并发编程领域,生产者消费者模式几乎能够解决绝大多数数据流的不平衡问题。其核心思想是:将生产数据的线程与消费数据的线程彻底解耦,从而显著提升系统的整体处理效率。
为何需要生产者消费者模式
设想这样一个场景:生产者线程产出数据的速度极快,而消费者线程处理数据的速度相对较慢。如果两者直接交互,生产者就必须等待消费者处理完当前数据才能继续生产;反之,当消费者处理更快时,又不得不空闲等待生产者。这种强耦合关系严重拖累了系统性能。
生产者消费者模式正是为了破解这一矛盾而诞生的经典解决方案。
什么是生产者消费者模式
简单来说,就是在生产者和消费者之间引入一个阻塞队列作为“缓冲区”。两者不再直接通信:生产者只需将数据放入队列,消费者则从队列中取出数据。任何一方都不需要等待对方,队列自动平衡了双方的处理速度差异。
from multiprocessing import Process,Queue
import time,random,os
def get_data(q,name):
while True:
data=q.get()
if data is None: # 收到结束信号,跳出循环
break
time.sleep(random.randint(1,3))
print('%s 取出了 %s' %(name,data))
def create_data(q,name):
for i in range(5):
time.sleep(random.randint(1,3))
data='数据%s' %i
q.put(data)
print('%s 生产了 %s' %(name,data))
q.put(None) # 发送结束信号,通知消费者停止
if __name__ == '__main__':
q=Queue()
p1=Process(target=create_data,args=(q,"生产者")) #生产者
p1.start()
c1=Process(target=get_data,args=(q,"消费者")) #消费者
c1.start()
生产者 生产了 数据0 消费者 取出了 数据0 生产者 生产了 数据1 生产者 生产了 数据2 消费者 取出了 数据1 生产者 生产了 数据3 消费者 取出了 数据2 生产者 生产了 数据4 消费者 取出了 数据3 消费者 取出了 数据4
从运行结果可以清晰看出,队列严格遵循先进先出顺序。此外,有一个容易忽视的陷阱:生产者生产完所有数据后,必须手动向队列放入结束信号(例如 None)。如果遗漏了这一步,消费者将持续阻塞在 q.get() 处,导致程序陷入死循环。而且,当存在多个消费者时,需要向队列中放入相应数量的结束信号才能正确终止所有消费者。
为了避免这种繁琐的人工管理,Python 提供了 JoinableQueue,它能更自然地协调生产者和消费者的结束时机,让代码更加简洁可靠。
JoinableQueue 的使用方式
# JoinableQueue 除了拥有 Queue 的所有方法外,还增加了两个通信方法: q.task_done() # 消费者在取出并处理完一条数据后调用,通知生产者该数据已被消费 # 每取一条数据都必须调用一次,否则会抛出 ValueError 异常 q.join() # 生产者调用此方法会阻塞,直到队列中所有的数据都被消费完毕
from multiprocessing import Process,JoinableQueue
import time,random
def get_data(q,name):
while True:
data=q.get()
if data is None: # 收到结束信号
break
time.sleep(random.randint(1,5))
print('%s 取出了 %s' %(name,data))
q.task_done() # 通知生产者数据已处理
def create_data(q,name):
for i in range(5):
time.sleep(random.randint(1,5))
data='数据%s' %i
q.put(data)
print('%s 生产了 %s' %(name,data))
q.join() # 生产者阻塞,等待所有数据被消费
if __name__ == '__main__':
q=JoinableQueue()
p1=Process(target=create_data,args=(q,"生产者"))
p1.start()
c1=Process(target=get_data,args=(q,"消费者"))
c1.daemon = True # 消费者设为守护进程,主进程结束后自动退出
c1.start()
p1.join() # 主进程等待生产者结束
现在来梳理完整的执行流程:
主进程先启动生产者进程,再启动消费者进程。由于 p1.join() 的存在,主进程会一直等待生产者结束。生产者每生产一条数据放入队列,消费者取出并处理后就调用 q.task_done() 向生产者反馈。生产者内部的 q.join() 会持续阻塞,直到消费者处理完队列中所有的数据。当所有数据都被消费完毕后,生产者退出,主进程继续执行 p1.join() 之后的代码。因为消费者被设置为守护进程,主进程一旦结束,消费者也会自动终止。整个过程干净利落,完全不需要手动发送结束信号。
总结
进程间通信的选型并不复杂:如果仅仅是简单的一对一场景,管道足以胜任;但若需要缓冲能力、支持多消费者时,队列无疑是更稳定可靠的选择。而 JoinableQueue 通过 task_done() 和 join() 的巧妙配合,完美化解了传统队列需要人工发送结束信号的痛点,使代码更健壮、更易于维护。在实际项目中,强烈建议优先使用 JoinableQueue 来构建生产者消费者模型,尤其是当消费者数量不固定时,它能带来显著的开发效率提升。
