最大并发数控制
约 904 字大约 3 分钟
2025-02-25
Python 是单线程的,但是可以通过多进程、多线程、异步编程等方式实现并发。协程是 Python 中一种轻量级的并发实现方式,它可以在单个线程中实现多个任务的切换执行。
是的,如果你想要实现类似的任务并发执行和等待的功能,而又不想用 ThreadPoolExecutor,可以使用 semaphore(信号量) 或 queue.Queue 来控制线程的并发数量。这些方法更轻量,适用于简单的线程控制场景。
方法 1:使用 threading.Semaphore 控制最大并发数
Semaphore 允许你限制同时运行的线程数量,超出的任务会自动等待。
代码示例
import threading import time
限制最多 3 个线程同时运行
semaphore = threading.Semaphore(3)
def task(n): with semaphore: # 获取信号量 print(f"任务 {n} 开始") time.sleep(2) # 模拟任务执行 2 秒 print(f"任务 {n} 结束")
启动多个线程
threads = [] for i in range(5): # 提交 5 个任务 t = threading.Thread(target=task, args=(i+1,)) t.start() threads.append(t)
等待所有任务完成
for t in threads: t.join()
输出示例
任务 1 开始 任务 2 开始 任务 3 开始 (过了 2 秒) 任务 1 结束 任务 4 开始 任务 2 结束 任务 5 开始 任务 3 结束 (再过 2 秒) 任务 4 结束 任务 5 结束
✅ 说明: • semaphore = threading.Semaphore(3) 控制最多 3 个任务同时运行。 • 超过 3 个的任务会等待前面任务完成后才执行。
方法 2:使用 queue.Queue 作为任务队列
如果你的任务是动态添加的,也可以使用 queue.Queue 让工作线程从队列里获取任务并执行。
代码示例
import threading import queue import time
task_queue = queue.Queue()
任务 worker
def worker(): while True: task = task_queue.get() # 获取任务 if task is None: # 终止信号 break print(f"任务 {task} 开始") time.sleep(2) print(f"任务 {task} 结束") task_queue.task_done()
启动 3 个 worker 线程
num_workers = 3 threads = [] for _ in range(num_workers): t = threading.Thread(target=worker) t.start() threads.append(t)
添加 5 个任务
for i in range(5): task_queue.put(i + 1)
等待所有任务完成
task_queue.join()
终止 worker 线程
for _ in range(num_workers): task_queue.put(None) # 发送终止信号
等待所有 worker 线程结束
for t in threads: t.join()
输出示例
任务 1 开始 任务 2 开始 任务 3 开始 (过了 2 秒) 任务 1 结束 任务 4 开始 任务 2 结束 任务 5 开始 任务 3 结束 (再过 2 秒) 任务 4 结束 任务 5 结束
✅ 说明: • queue.Queue() 作为任务队列,支持任务动态添加。 • task_queue.get() 取任务,task_queue.task_done() 标记任务完成。 • 线程池大小固定为 3(num_workers = 3)。 • None 作为终止信号,告诉 worker 线程任务结束。
对比不同方法
方法 适用场景 线程控制 复杂度 ThreadPoolExecutor(max_workers=N) 适合大多数场景,内置线程池 自动管理 简单 threading.Semaphore(N) 适用于轻量控制并发的场景 通过信号量控制 简单 queue.Queue() + Thread 适用于任务动态添加、消费者模式 任务队列控制 中等
如果你的需求只是限制最大线程数,Semaphore 更轻量;如果需要任务队列和动态管理,queue.Queue() 是更合适的方案。
🚀 推荐 • 如果是轻量级并发控制,用 Semaphore。 • 如果是任务队列、动态调度,用 queue.Queue()。 • 如果是自动管理线程池,用 ThreadPoolExecutor。
哪种方式更适合你的需求?😊