前言

大家好,我是 倔强青铜三。欢迎关注我,微信公众号: 倔强青铜三。点赞、收藏、关注,一键三连!

今天咱们来一起挖掘 Python 自带的 "并发神器" —— concurrent 模块,让你的 CPU 性能瞬间拉满。

一、concurrent 模块登场的必要性

Python 中已经有了 threading 和 multiprocessing 这两个模块来处理多线程和多进程相关操作,但是 concurrent 模块的出现有其独特的优势:

  • 高层抽象,简化开发 :它在 threading 和 multiprocessing 基础之上提供了更高层次的抽象,使得开发者可以更加方便地构建并行程序,无需过多关注底层细节。
  • 统一接口,便于切换 :通过 concurrent.futures 模块中的 Executor 接口,无论是使用线程池还是进程池,都有一致的接口和使用方式,方便在不同场景下灵活切换。
  • 强大功能,提升效率 :提供了诸如 Future 对象等强大的功能,方便对异步操作的结果进行管理和监控,有效提升并发编程的效率和可维护性。

concurrent 模块常用 API 一览表

API作用关键入参返回值
concurrent.futures.ThreadPoolExecutor创建线程池执行器max_workers 最大工作线程数等Future 对象
concurrent.futures.ProcessPoolExecutor创建进程池执行器max_workers 最大工作进程数等Future 对象
concurrent.futures.Future表示异步执行的操作异步操作的结果

二、快速体验 ThreadPoolExecutor 并发下载

# demo_thread_pool.py
import concurrent.futures
import urllib.request
import time

# 模拟下载任务
def download_url(url):
    try:
        with urllib.request.urlopen(url, timeout=5) as response:
            content = response.read()
            return len(content)
    except Exception as e:
        return str(e)

urls = [
    "https://www.baidu.com",
    "https://www.qq.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.sina.com.cn"
]

# 单线程下载
start_time = time.time()
for url in urls:
    print(download_url(url))
print("单线程下载耗时:", time.time() - start_time)

# 线程池下载
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    start_time = time.time()
    futures = {executor.submit(download_url, url): url for url in urls}
    for future in concurrent.futures.as_completed(futures):
        url = futures[future]
        try:
            result = future.result()
            print(f"{url}: {result}")
        except Exception as e:
            print(f"{url} 生成了异常: {e}")
    print("线程池下载耗时:", time.time() - start_time)

运行效果示例(因网络等因素实际结果可能不同):

单线程下载耗时: 0.4107840061187744
https://www.baidu.com: 29506
https://www.taobao.com: 83249
https://www.qq.com: 122185
https://www.jd.com: 177956
https://www.sina.com.cn: 397807
线程池下载耗时: 0.136491060256958

三、ProcessPoolExecutor 处理 CPU 密集型任务

# demo_process_pool.py
import concurrent.futures
import time

# CPU 密集型任务
def cpu_intensive_task(x):
    result = 0
    for i in range(10**7):
        result += i ** x
    return result

data = [2, 3, 4, 5, 6, 7, 8, 9]

# 单进程处理
start_time = time.time()
for num in data:
    print(cpu_intensive_task(num))
print("单进程处理耗时:", time.time() - start_time)

# 进程池处理
with concurrent.futures.ProcessPoolExecutor() as executor:
    start_time = time.time()
    futures = {executor.submit(cpu_intensive_task, num): num for num in data}
    for future in concurrent.futures.as_completed(futures):
        num = futures[future]
        try:
            result = future.result()
            print(f"任务 {num} 的结果:{result}")
        except Exception as e:
            print(f"任务 {num} 生成了异常: {e}")
    print("进程池处理耗时:", time.time() - start_time)

运行效果示例(具体数值和耗时根据机器性能而定):

单进程处理耗时: 45.6789
任务 2 的结果: 333333330000000
任务 3 的结果: 4000000060000000
...
进程池处理耗时: 12.3456

四、Future 对象掌控异步操作

# demo_future.py
import concurrent.futures
import time

def task(n):
    time.sleep(n)
    return f"任务完成,休眠了 {n} 秒"

# 创建线程池执行器
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)

# 提交任务并获取 Future 对象
future1 = executor.submit(task, 2)
future2 = executor.submit(task, 3)

print("检查 task1 是否完成:", future1.done())  # False
print("检查 task2 是否完成:", future2.done())  # False

time.sleep(3)

print("检查 task1 是否完成:", future1.done())  # True
print("检查 task2 是否完成:", future2.done())  # True

print("task1 的结果:", future1.result())  # 获取任务结果
print("task2 的结果:", future2.result())

executor.shutdown()  # 关闭执行器

运行效果:

检查 task1 是否完成: False
检查 task2 是否完成: False
检查 task1 是否完成: True
检查 task2 是否完成: True
task1 的结果: 任务完成,休眠了 2 秒
task2 的结果: 任务完成,休眠了 3

五、如何选择合适的并发工具

任务类型判断

如果是 I/O 密集型任务,比如大量网络请求、文件操作等,使用 ThreadPoolExecutor 更合适。

因为它可以充分利用线程在等待 I/O 操作期间切换到其他任务执行的优势,提高整体效率。

而如果是 CPU 密集型任务,如复杂的数学计算、数据处理等,则更适合使用 ProcessPoolExecutor

因为多进程可以突破 Python 的全局解释器锁(GIL)限制,真正实现多个 CPU 核心并行计算。

资源限制考虑

根据系统的线程和进程资源限制,合理设置 max_workers 参数。

线程太多可能会导致频繁的线程切换开销,进程太多则会占用过多的内存和系统资源。

结果处理需求

如果需要对异步操作的结果进行复杂的处理和监控,Future 对象提供了丰富的接口和方法,可以方便地获取任务状态、结果、异常等信息。

无论使用线程池还是进程池,都可以借助 Future 来更好地管理和协调并发任务。


小结

武器用途一句话记忆
ThreadPoolExecutor处理 I/O 密集型任务I/O 密集型,线程池更高效
ProcessPoolExecutor处理 CPU 密集型任务CPU 密集型,进程池突破 GIL
Future管理异步操作异步操作好帮手
零依赖官方自带开箱即用
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]