当前位置: 首页 > news >正文

Python 多线程和多进程高级应用指南

Python 多线程和多进程高级应用指南

1. 多线程基础

Python 的多线程是通过threading模块实现的,它允许我们在同一进程内并发执行多个任务。

import threading import time def worker(name): print(f"Worker {name} started") time.sleep(1) print(f"Worker {name} completed") # 创建线程 thread1 = threading.Thread(target=worker, args=("1",)) thread2 = threading.Thread(target=worker, args=("2",)) # 启动线程 thread1.start() thread2.start() # 等待线程完成 thread1.join() thread2.join() print("All workers completed")

2. 多进程基础

Python 的多进程是通过multiprocessing模块实现的,它允许我们在不同进程内并发执行多个任务。

import multiprocessing import time def worker(name): print(f"Worker {name} started") time.sleep(1) print(f"Worker {name} completed") # 创建进程 process1 = multiprocessing.Process(target=worker, args=("1",)) process2 = multiprocessing.Process(target=worker, args=("2",)) # 启动进程 process1.start() process2.start() # 等待进程完成 process1.join() process2.join() print("All workers completed")

3. 高级多线程技巧

3.1 线程池

import concurrent.futures import time def worker(name): print(f"Worker {name} started") time.sleep(1) print(f"Worker {name} completed") return f"Result from worker {name}" # 使用线程池 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 提交任务 futures = [executor.submit(worker, str(i)) for i in range(5)] # 收集结果 for future in concurrent.futures.as_completed(futures): result = future.result() print(result)

3.2 线程同步

import threading import time # 创建锁 lock = threading.Lock() shared_resource = 0 def worker(name): global shared_resource print(f"Worker {name} started") # 获取锁 with lock: print(f"Worker {name} acquired lock") current = shared_resource time.sleep(0.5) shared_resource = current + 1 print(f"Worker {name} updated shared resource to {shared_resource}") print(f"Worker {name} completed") # 创建线程 threads = [threading.Thread(target=worker, args=(str(i),)) for i in range(3)] # 启动线程 for thread in threads: thread.start() # 等待线程完成 for thread in threads: thread.join() print(f"Final shared resource value: {shared_resource}")

3.3 线程通信

import threading import queue import time def producer(queue): for i in range(5): print(f"Producing item {i}") queue.put(i) time.sleep(0.5) def consumer(queue): while True: item = queue.get() if item is None: break print(f"Consuming item {item}") time.sleep(1) queue.task_done() # 创建队列 q = queue.Queue() # 创建线程 producer_thread = threading.Thread(target=producer, args=(q,)) consumer_thread = threading.Thread(target=consumer, args=(q,)) # 启动线程 producer_thread.start() consumer_thread.start() # 等待生产者完成 producer_thread.join() # 发送结束信号 q.put(None) # 等待消费者完成 consumer_thread.join() print("All tasks completed")

4. 高级多进程技巧

4.1 进程池

import concurrent.futures import time def worker(name): print(f"Worker {name} started") time.sleep(1) print(f"Worker {name} completed") return f"Result from worker {name}" # 使用进程池 with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor: # 提交任务 futures = [executor.submit(worker, str(i)) for i in range(5)] # 收集结果 for future in concurrent.futures.as_completed(futures): result = future.result() print(result)

4.2 进程间通信

import multiprocessing import time def producer(conn): for i in range(5): print(f"Producing item {i}") conn.send(i) time.sleep(0.5) conn.close() def consumer(conn): while True: try: item = conn.recv() print(f"Consuming item {item}") time.sleep(1) except EOFError: break # 创建管道 parent_conn, child_conn = multiprocessing.Pipe() # 创建进程 producer_process = multiprocessing.Process(target=producer, args=(child_conn,)) consumer_process = multiprocessing.Process(target=consumer, args=(parent_conn,)) # 启动进程 producer_process.start() consumer_process.start() # 等待进程完成 producer_process.join() consumer_process.join() print("All tasks completed")

4.3 共享内存

import multiprocessing import time def worker(shared_value, lock): print(f"Worker started") # 获取锁 with lock: current = shared_value.value time.sleep(0.5) shared_value.value = current + 1 print(f"Worker updated shared value to {shared_value.value}") print(f"Worker completed") # 创建共享内存 shared_value = multiprocessing.Value('i', 0) lock = multiprocessing.Lock() # 创建进程 processes = [multiprocessing.Process(target=worker, args=(shared_value, lock)) for _ in range(3)] # 启动进程 for process in processes: process.start() # 等待进程完成 for process in processes: process.join() print(f"Final shared value: {shared_value.value}")

5. 实际应用场景

5.1 网络请求

import concurrent.futures import requests urls = [ "https://api.github.com", "https://api.example.com", "https://api.google.com", "https://api.amazon.com", "https://api.microsoft.com" ] def fetch_url(url): print(f"Fetching {url}") response = requests.get(url) return f"{url}: {response.status_code}" # 使用线程池 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: results = list(executor.map(fetch_url, urls)) for result in results: print(result)

5.2 数据处理

import concurrent.futures import time def process_data(data): print(f"Processing data: {data}") time.sleep(1) # 模拟处理时间 return data * 2 data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 使用进程池 with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_data, data)) print(f"Processed results: {results}")

5.3 并行计算

import concurrent.futures import math def calculate_pi(n): """使用蒙特卡洛方法计算 π""" import random count = 0 for _ in range(n): x = random.random() y = random.random() if x*x + y*y <= 1: count += 1 return 4 * count / n # 并行计算 with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: # 每个进程计算 10000000 次 futures = [executor.submit(calculate_pi, 10000000) for _ in range(4)] # 收集结果并取平均值 results = [future.result() for future in futures] pi = sum(results) / len(results) print(f"Calculated π: {pi}") print(f"Actual π: {math.pi}") print(f"Error: {abs(pi - math.pi)}")

6. 最佳实践

  1. 选择合适的并发模型:对于 I/O 密集型任务,使用多线程;对于 CPU 密集型任务,使用多进程。
  2. 使用线程池/进程池:使用ThreadPoolExecutorProcessPoolExecutor管理线程/进程,避免频繁创建和销毁。
  3. 线程安全:对于共享资源,使用锁或其他同步机制确保线程安全。
  4. 进程间通信:对于多进程,使用管道、队列或共享内存进行通信。
  5. 异常处理:在并发任务中正确处理异常,避免任务崩溃影响其他任务。
  6. 资源管理:确保线程/进程正确关闭,避免资源泄漏。
  7. 性能监控:监控并发任务的性能,调整线程/进程数量以获得最佳性能。

7. 总结

Python 的多线程和多进程是实现并发执行的强大工具。通过掌握这些高级应用,我们可以充分利用系统资源,提高程序的性能和响应速度。

在实际应用中,多线程和多进程可以用于网络请求、数据处理、并行计算等多种场景,大大提高程序的执行效率。

希望本文对你理解和应用 Python 多线程和多进程有所帮助!

http://www.cnnetsun.cn/news/2152568.html

相关文章:

  • 铭记历史性时刻2026年04月29日第一台人工场发生器
  • 中欧与东欧科技创业生态:人才优势与技术策略
  • PL360-460 nm Oil-soluble CdS QDs,油溶性半导体量子点的定制合成
  • 告别命令行!用Canal-Admin 1.1.5图形化管理你的Canal-Server(附集群配置避坑点)
  • 手把手教你用NFC读写器软件(附最新版下载)读取M1卡扇区数据与密钥
  • 保姆级教程:手把手配置AUTOSAR CanSM模块的BusOff恢复策略(含ETAS工具实战截图)
  • 【无人机编队控制】二维平面和三维空间环形拓扑的分布式无人机编队控制Matlab仿真
  • CefFlashBrowser:Flash内容重获新生的终极解决方案
  • AArch64处理器特性寄存器解析与应用实践
  • OpenClaw 2.6.4(小龙虾)虾壳云版|Windows10/11 64 位一键部署教程
  • 5分钟部署FontCenter:AutoCAD字体管理插件的终极解决方案
  • 紧急通知:2025年起PHP表单模块未完成国产化替换将暂停财政资金拨付——3类存量系统0代码改造速成法
  • C# 13内联数组深度剖析:绕过GC、消除堆分配、减少缓存未命中——实测内存访问延迟降低62%
  • 基于 XGBoost 的股票涨跌预测实战(附完整可运行 Streamlit 代码)
  • 如何用DyberPet桌面宠物框架重构你的数字生活体验?
  • Laravel 12新特性×AI工程化落地:从Native JSON Schema Validation到AI生成Migration的全自动闭环(含可复用Composer包)
  • 如何永久保存你的数字记忆?WeChatMsg让聊天记录变成可视化人生报告
  • 工业数据采集系统选型与误差控制实战指南
  • 量子计算中的贝尔不等式与准备非平稳性研究
  • PCIe时钟信号HCSL与LPHCSL选型指南:功耗、匹配与布线实战(附PCIe 4.0/5.0时钟设计要点)
  • 机密计算技术解析:TEE原理与行业应用实践
  • 医疗电子PCB设计:挑战、标准与关键技术解析
  • 智能配置黑苹果终极指南:五分钟完成OpenCore EFI一键生成
  • 贴纸印刷厂家排行榜:2026年十大高口碑推荐清单
  • UI学习:通知传值
  • 论文AI检测通关攻略:4个实用技巧帮你快速达标
  • CompactGUI终极指南:如何免费为你的游戏节省60%硬盘空间
  • 基于WeDLM-7B-Base的智能文档处理系统:从OCR到信息提取
  • LeetCode105 迭代版|前序+中序重构二叉树(速度内存双99%,超详细拆解)
  • 给你的STM32项目加点‘光’:基于F103C8T6和WS2812的智能氛围灯DIY全记录