寂赤忍袭DX免安装绿色版
187M · 2025-11-01
大家好,我是倔强青铜三。欢迎关注我,微信公众号:倔强青铜三。点赞、收藏、关注,一键三连!
欢迎继续 苦练Python第64天。
今天咱们把“并发”这把瑞士军刀——threading 模块,从开箱到实战一次性讲透。全程只用 Python 自带标准库,代码复制即可运行!
multiprocessing。| API | 作用 | 关键入参 | 返回值/副作用 |
|---|---|---|---|
threading.Thread(target, args=(), kwargs={}, name=None, daemon=None) | 创建线程对象 | target: 可调用对象;args/kwargs: 位置/关键字参数;name: 线程名;daemon: 是否为守护线程 | Thread 实例 |
Thread.start() | 启动线程,底层调用 run() | — | 若重复调用抛 RuntimeError |
Thread.run() | 线程真正执行的逻辑;可被子类重写 | — | 无 |
Thread.join(timeout=None) | 阻塞等待线程结束 | timeout: 秒级浮点超时 | 总是 None;超时后仍需用 is_alive() 判断是否存活 |
Thread.is_alive() | 线程是否存活 | — | True/False |
Thread.name / Thread.ident / Thread.native_id | 线程名字/线程标识符/系统级线程 ID | — | str / int or None / int or None |
Thread.daemon | 守护线程标志 | 可读写布尔值 | 设置前必须未启动 |
threading.current_thread() | 获取当前线程对象 | — | Thread 实例 |
threading.active_count() | 当前存活线程数量 | — | int |
threading.enumerate() | 当前所有存活线程列表 | — | list[Thread] |
threading.Lock() | 创建原始互斥锁 | — | Lock 实例 |
Lock.acquire(blocking=True, timeout=-1) | 获取锁 | blocking=False 非阻塞;timeout 秒级超时 | 成功返回 True,否则 False |
Lock.release() | 释放锁 | — | 若未持有锁抛 RuntimeError |
Lock.locked() | 查询锁状态 | — | True/False |
threading.RLock() | 创建可重入锁 | — | RLock 实例;方法同 Lock |
threading.Event() | 事件对象 | — | Event 实例 |
Event.set() / Event.clear() / Event.wait(timeout=None) | 置位/复位/等待事件 | timeout 秒级超时 | wait 返回 True(被 set)或 False(超时) |
threading.Timer(interval, function, args=None, kwargs=None) | 延时线程 | interval: 延迟秒;function: 回调;args/kwargs: 参数 | Timer 实例,可 .cancel() |
threading.local() | 线程局部数据容器 | — | local 实例,属性隔离 |
# demo_hello_thread.py
import threading
import time
def say_hello(name, delay):
time.sleep(delay)
print(f"你好,{name},来自线程 {threading.current_thread().name}")
# 创建线程
t = threading.Thread(target=say_hello, args=("倔强青铜三", 2), name="青铜线程")
t.start() # 启动
t.join() # 等它跑完
print("主线程结束")
运行效果:
你好,倔强青铜三,来自线程 青铜线程
主线程结束
场景:并发爬 3 个网页(用 sleep 模拟 I/O)。
# demo_pool.py
import threading
import time
links = ["https://a.com", "https://b.com", "https://c.com"]
def crawl(url):
print(f"开始 {url}")
time.sleep(2) # 模拟网络延迟
print(f"完成 {url}")
threads = []
for link in links:
t = threading.Thread(target=crawl, args=(link,))
t.start()
threads.append(t)
for t in threads:
t.join()
print("全部爬完!")
运行效果:
开始 https://a.com
开始 https://b.com
开始 https://c.com
完成 https://a.com
完成 https://b.com
完成 https://c.com
全部爬完!
卖票案例:100 张票,10 个窗口同时卖,不加锁会超卖。
# demo_lock.py
import threading
tickets = 4
lock = threading.Lock()
def sell(window_id):
global tickets
while True:
with lock: # 推荐用 with,自动 acquire/release
if tickets <= 0:
break
tickets -= 1
print(f"窗口{window_id} 卖出 1 张,剩余 {tickets}")
# 临界区外可快速做其他事
print(f"窗口{window_id} 下班~")
threads = [threading.Thread(target=sell, args=(i+1,)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
运行效果:
窗口1 卖出 1 张,剩余 3
窗口1 卖出 1 张,剩余 2
窗口1 卖出 1 张,剩余 1
窗口1 卖出 1 张,剩余 0
窗口1 下班~
窗口2 下班~
窗口3 下班~
窗口4 下班~
主线程发信号让子线程起跑。
# demo_event.py
import threading
import time
start_event = threading.Event()
def runner(name):
print(f"{name} 就位,等发令枪")
start_event.wait() # 阻塞直到 set()
print(f"{name} 起跑!")
for i in range(3):
threading.Thread(target=runner, args=(f"选手{i+1}",)).start()
time.sleep(2)
print("裁判:预备——跑!")
start_event.set() # 一枪令下
运行效果:
选手1 就位,等发令枪
选手2 就位,等发令枪
选手3 就位,等发令枪
裁判:预备——跑!
选手1 起跑!
选手2 起跑!
选手3 起跑!
延迟 3 秒响铃,可中途取消。
# demo_timer.py
import threading
def ring():
print("⏰ 起床啦!!")
alarm = threading.Timer(3, ring)
alarm.start()
print("3 秒后响铃,输入 c 取消")
if input().strip() == "c":
alarm.cancel()
print("闹钟已取消")
每个线程独享一份变量,互不打架。
# demo_local.py
import threading
local = threading.local()
def worker(num):
local.count = num # 线程独有属性
for _ in range(3):
local.count += 1
print(f"{threading.current_thread().name} -> {local.count}")
for i in range(2):
threading.Thread(target=worker, args=(i*10,), name=f"线程{i+1}").start()
运行效果:
线程1 -> 1
线程1 -> 2
线程1 -> 3
线程2 -> 11
线程2 -> 12
线程2 -> 13
功能:并发“下载”多个文件,统计总耗时。
# demo_downloader.py
import threading
import time
import random
urls = [f"https://file{i}.bin" for i in range(5)]
results = {}
lock = threading.Lock()
def download(url):
print(f"开始 {url}")
sec = random.randint(1, 3)
time.sleep(sec)
with lock:
results[url] = f"{sec}s"
print(f"{url} 完成,耗时 {sec}s")
start = time.time()
threads = [threading.Thread(target=download, args=(u,)) for u in urls]
for t in threads:
t.start()
for t in threads:
t.join()
print("全部下载完毕!")
for url, spent in results.items():
print(url, "->", spent)
print(f"总耗时 {time.time() - start:.2f}s")
运行效果:
开始 https://file0.bin
开始 https://file1.bin
开始 https://file2.bin
开始 https://file3.bin
开始 https://file4.bin
https://file1.bin 完成,耗时 1s
https://file2.bin 完成,耗时 1s
https://file4.bin 完成,耗时 2s
https://file0.bin 完成,耗时 3s
https://file3.bin 完成,耗时 3s
全部下载完毕!
https://file1.bin -> 1s
https://file2.bin -> 1s
https://file4.bin -> 2s
https://file0.bin -> 3s
https://file3.bin -> 3s
总耗时 3.00s
还记得文件操作的 with open(...) as f: 吗?
threading 模块里的 Lock、RLock、Condition、Semaphore、BoundedSemaphore 全部支持 with 协议:
进入代码块自动 acquire(),退出时自动 release()——不会忘、不会漏、不会死锁!
# 传统写法:容易漏掉 release()
lock = threading.Lock()
lock.acquire()
try:
# 临界区
global_num += 1
finally:
lock.release()
# with 写法:一行搞定,异常也不怕
lock = threading.Lock()
with lock:
global_num += 1
# demo_rlock_with.py
import threading
rlock = threading.RLock()
def nested():
with rlock: # 第一次获取
print("外层加锁")
with rlock: # 同一线程可再次获取
print("内层重入,不会死锁")
threading.Thread(target=nested).start()
# demo_condition_with.py
import threading
cv = threading.Condition()
flag = False
def waiter():
with cv: # 自动 acquire
cv.wait_for(lambda: flag) # 等待条件成立
print("waiter 收到通知!")
def setter():
global flag
with cv:
flag = True
cv.notify_all()
threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()
# demo_sema_with.py
import threading
import time
pool = threading.Semaphore(value=3) # 并发 3 条
def worker(i):
with pool: # 获取令牌
print(f"任务 {i} 进入")
time.sleep(2)
print(f"任务 {i} 完成")
for i in range(5):
threading.Thread(target=worker, args=(i,)).start()
只要实现 __enter__、__exit__ 即可:
class MyLock:
def __init__(self):
self._lock = threading.Lock()
def __enter__(self):
self._lock.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
with MyLock():
print("自定义锁也能 with!")
| 武器 | 用途 | 备注 |
|---|---|---|
| Thread | 创建线程 | 始终记得 join |
| Lock/RLock | 临界区互斥 | 推荐 with lock: |
| Event | 线程间通知 | set/clear/wait |
| Timer | 延迟执行 | 可 cancel |
| local | 线程独享数据 | 替代全局变量 |