前言

大家好,我是倔强青铜三。欢迎关注我,微信公众号:倔强青铜三。点赞、收藏、关注,一键三连!

欢迎继续 苦练Python第64天

今天咱们把“并发”这把瑞士军刀——threading 模块,从开箱到实战一次性讲透。全程只用 Python 自带标准库,代码复制即可运行!


一、为什么需要线程?

  • I/O 密集场景:爬虫、文件下载、日志采集,CPU 在等网络/磁盘,闲着也是闲着。
  • 共享内存:比多进程轻量,数据不用序列化来回拷贝。
  • GIL?别慌:I/O 密集时线程照样提速;CPU 密集请转投 multiprocessing

threading 常用 API 一览表

API作用关键入参返回值/副作用
threading.Thread(target, args=(), kwargs={}, name=None, daemon=None)创建线程对象target: 可调用对象;args/kwargs: 位置/关键字参数;name: 线程名;daemon: 是否为守护线程Thread 实例
Thread.start()启动线程,底层调用 run()若重复调用抛 RuntimeError
Thread.run()线程真正执行的逻辑;可被子类重写
Thread.join(timeout=None)阻塞等待线程结束timeout: 秒级浮点超时总是 None;超时后仍需用 is_alive() 判断是否存活
Thread.is_alive()线程是否存活True/False
Thread.name / Thread.ident / Thread.native_id线程名字/线程标识符/系统级线程 IDstr / int or None / int or None
Thread.daemon守护线程标志可读写布尔值设置前必须未启动
threading.current_thread()获取当前线程对象Thread 实例
threading.active_count()当前存活线程数量int
threading.enumerate()当前所有存活线程列表list[Thread]
threading.Lock()创建原始互斥锁Lock 实例
Lock.acquire(blocking=True, timeout=-1)获取锁blocking=False 非阻塞;timeout 秒级超时成功返回 True,否则 False
Lock.release()释放锁若未持有锁抛 RuntimeError
Lock.locked()查询锁状态True/False
threading.RLock()创建可重入锁RLock 实例;方法同 Lock
threading.Event()事件对象Event 实例
Event.set() / Event.clear() / Event.wait(timeout=None)置位/复位/等待事件timeout 秒级超时wait 返回 True(被 set)或 False(超时)
threading.Timer(interval, function, args=None, kwargs=None)延时线程interval: 延迟秒;function: 回调;args/kwargs: 参数Timer 实例,可 .cancel()
threading.local()线程局部数据容器local 实例,属性隔离

二、30 秒启动你的第一个线程

# demo_hello_thread.py
import threading
import time

def say_hello(name, delay):
    time.sleep(delay)
    print(f"你好,{name},来自线程 {threading.current_thread().name}")

# 创建线程
t = threading.Thread(target=say_hello, args=("倔强青铜三", 2), name="青铜线程")
t.start()          # 启动
t.join()           # 等它跑完
print("主线程结束")

运行效果:

你好,倔强青铜三,来自线程 青铜线程
主线程结束

三、批量任务:线程池手写版

场景:并发爬 3 个网页(用 sleep 模拟 I/O)。

# demo_pool.py
import threading
import time

links = ["https://a.com", "https://b.com", "https://c.com"]

def crawl(url):
    print(f"开始 {url}")
    time.sleep(2)          # 模拟网络延迟
    print(f"完成 {url}")

threads = []
for link in links:
    t = threading.Thread(target=crawl, args=(link,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print("全部爬完!")

运行效果:

开始 https://a.com
开始 https://b.com
开始 https://c.com
完成 https://a.com
完成 https://b.com
完成 https://c.com
全部爬完!

四、线程安全:Lock 互斥锁

卖票案例:100 张票,10 个窗口同时卖,不加锁会超卖。

# demo_lock.py
import threading

tickets = 4
lock = threading.Lock()

def sell(window_id):
    global tickets
    while True:
        with lock:              # 推荐用 with,自动 acquire/release
            if tickets <= 0:
                break
            tickets -= 1
            print(f"窗口{window_id} 卖出 1 张,剩余 {tickets}")
        # 临界区外可快速做其他事
    print(f"窗口{window_id} 下班~")

threads = [threading.Thread(target=sell, args=(i+1,)) for i in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

运行效果:

窗口1 卖出 1 张,剩余 3
窗口1 卖出 1 张,剩余 2
窗口1 卖出 1 张,剩余 1
窗口1 卖出 1 张,剩余 0
窗口1 下班~
窗口2 下班~
窗口3 下班~
窗口4 下班~

五、线程通信:Event 红绿灯

主线程发信号让子线程起跑。

# demo_event.py
import threading
import time

start_event = threading.Event()

def runner(name):
    print(f"{name} 就位,等发令枪")
    start_event.wait()          # 阻塞直到 set()
    print(f"{name} 起跑!")

for i in range(3):
    threading.Thread(target=runner, args=(f"选手{i+1}",)).start()

time.sleep(2)
print("裁判:预备——跑!")
start_event.set()               # 一枪令下

运行效果:

选手1 就位,等发令枪
选手2 就位,等发令枪
选手3 就位,等发令枪
裁判:预备——跑!
选手1 起跑!
选手2 起跑!
选手3 起跑!

六、定时器:Timer 秒杀闹钟

延迟 3 秒响铃,可中途取消。

# demo_timer.py
import threading

def ring():
    print("⏰ 起床啦!!")

alarm = threading.Timer(3, ring)
alarm.start()
print("3 秒后响铃,输入 c 取消")
if input().strip() == "c":
    alarm.cancel()
    print("闹钟已取消")

七、线程局部变量:local 隔离数据

每个线程独享一份变量,互不打架。

# demo_local.py
import threading

local = threading.local()

def worker(num):
    local.count = num          # 线程独有属性
    for _ in range(3):
        local.count += 1
        print(f"{threading.current_thread().name} -> {local.count}")

for i in range(2):
    threading.Thread(target=worker, args=(i*10,), name=f"线程{i+1}").start()

运行效果:

线程1 -> 1
线程1 -> 2
线程1 -> 3
线程2 -> 11
线程2 -> 12
线程2 -> 13

八、完整实战:多线程文件下载器(模拟)

功能:并发“下载”多个文件,统计总耗时。

# demo_downloader.py
import threading
import time
import random

urls = [f"https://file{i}.bin" for i in range(5)]
results = {}
lock = threading.Lock()

def download(url):
    print(f"开始 {url}")
    sec = random.randint(1, 3)
    time.sleep(sec)
    with lock:
        results[url] = f"{sec}s"
    print(f"{url} 完成,耗时 {sec}s")

start = time.time()
threads = [threading.Thread(target=download, args=(u,)) for u in urls]
for t in threads:
    t.start()
for t in threads:
    t.join()

print("全部下载完毕!")
for url, spent in results.items():
    print(url, "->", spent)
print(f"总耗时 {time.time() - start:.2f}s")

运行效果:

开始 https://file0.bin
开始 https://file1.bin
开始 https://file2.bin
开始 https://file3.bin
开始 https://file4.bin
https://file1.bin 完成,耗时 1s
https://file2.bin 完成,耗时 1s
https://file4.bin 完成,耗时 2s
https://file0.bin 完成,耗时 3s
https://file3.bin 完成,耗时 3s
全部下载完毕!
https://file1.bin -> 1s
https://file2.bin -> 1s
https://file4.bin -> 2s
https://file0.bin -> 3s
https://file3.bin -> 3s
总耗时 3.00s

九、with 上下文管理器 × threading:让锁像文件一样好写

还记得文件操作的 with open(...) as f: 吗?
threading 模块里的 Lock、RLock、Condition、Semaphore、BoundedSemaphore 全部支持 with 协议:
进入代码块自动 acquire(),退出时自动 release()——不会忘、不会漏、不会死锁!

9.1 原始锁 Lock 的两种写法对比

#  传统写法:容易漏掉 release()
lock = threading.Lock()
lock.acquire()
try:
    # 临界区
    global_num += 1
finally:
    lock.release()

#  with 写法:一行搞定,异常也不怕
lock = threading.Lock()
with lock:
    global_num += 1

9.2 RLock 递归锁同样适用

# demo_rlock_with.py
import threading

rlock = threading.RLock()

def nested():
    with rlock:          # 第一次获取
        print("外层加锁")
        with rlock:      # 同一线程可再次获取
            print("内层重入,不会死锁")

threading.Thread(target=nested).start()

9.3 Condition 条件变量 + with 经典范式

# demo_condition_with.py
import threading

cv   = threading.Condition()
flag = False

def waiter():
    with cv:                     # 自动 acquire
        cv.wait_for(lambda: flag)  # 等待条件成立
        print("waiter 收到通知!")

def setter():
    global flag
    with cv:
        flag = True
        cv.notify_all()

threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()

9.4 Semaphore 资源池限流

# demo_sema_with.py
import threading
import time

pool = threading.Semaphore(value=3)  # 并发 3 条

def worker(i):
    with pool:              # 获取令牌
        print(f"任务 {i} 进入")
        time.sleep(2)
        print(f"任务 {i} 完成")

for i in range(5):
    threading.Thread(target=worker, args=(i,)).start()

9.5 自定义类也能支持 with

只要实现 __enter____exit__ 即可:

class MyLock:
    def __init__(self):
        self._lock = threading.Lock()
    def __enter__(self):
        self._lock.acquire()
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        self._lock.release()

with MyLock():
    print("自定义锁也能 with!")

小结

武器用途备注
Thread创建线程始终记得 join
Lock/RLock临界区互斥推荐 with lock:
Event线程间通知set/clear/wait
Timer延迟执行可 cancel
local线程独享数据替代全局变量

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]