当前位置: 首页 > news >正文

Python协程使用详解

Python协程使用详解

协程(Coroutine)是Python中实现并发编程的一种重要方式,它比线程更轻量级,适合I/O密集型任务。下面我通过几个例子来详细讲解。

1. 基础概念

协程通过async/await语法实现,主要特点:

  • 使用async def定义协程函数
  • 使用await挂起协程执行
  • 需要事件循环(event loop)来运行

2. 基础示例

示例1:最简单的协程

importasyncio# 定义协程函数asyncdefhello():print("Hello")awaitasyncio.sleep(1)# 模拟I/O操作print("World")# 运行协程asyncio.run(hello())

示例2:多个协程顺序执行

importasyncioasyncdeftask1():print("开始任务1")awaitasyncio.sleep(2)print("完成任务1")return"结果1"asyncdeftask2():print("开始任务2")awaitasyncio.sleep(1)print("完成任务2")return"结果2"asyncdefmain():# 顺序执行result1=awaittask1()result2=awaittask2()print(f"结果:{result1},{result2}")asyncio.run(main())

3. 并发执行协程

示例3:使用asyncio.gather并发运行

importasyncioasyncdefdownload(url,seconds):print(f"开始下载{url}")awaitasyncio.sleep(seconds)# 模拟下载时间print(f"完成下载{url}")returnf"{url}的内容"asyncdefmain():# 创建多个任务并发执行tasks=[download("http://example.com/page1",3),download("http://example.com/page2",2),download("http://example.com/page3",1)]# 并发执行所有任务results=awaitasyncio.gather(*tasks)print("所有下载完成:",results)asyncio.run(main())

示例4:使用asyncio.create_task

importasyncioasyncdefprocess_data(data_id,processing_time):print(f"开始处理数据{data_id}")awaitasyncio.sleep(processing_time)print(f"完成处理数据{data_id}")returnf"处理后的数据{data_id}"asyncdefmain():# 创建任务但不立即等待task1=asyncio.create_task(process_data(1,2))task2=asyncio.create_task(process_data(2,1))task3=asyncio.create_task(process_data(3,3))# 在需要的时候等待结果result1=awaittask1 result2=awaittask2 result3=awaittask3print(f"结果:{result1},{result2},{result3}")asyncio.run(main())

4. 协程间通信

示例5:使用asyncio.Queue

importasyncioimportrandomasyncdefproducer(queue,producer_id):foriinrange(3):item=f"产品{producer_id}-{i}"awaitasyncio.sleep(random.random())# 模拟生产时间awaitqueue.put(item)print(f"生产者{producer_id}生产了:{item}")awaitqueue.put(None)# 结束信号asyncdefconsumer(queue,consumer_id):whileTrue:item=awaitqueue.get()ifitemisNone:queue.put(None)# 将结束信号放回队列供其他消费者使用breakawaitasyncio.sleep(random.random()*2)# 模拟消费时间print(f"消费者{consumer_id}消费了:{item}")queue.task_done()asyncdefmain():queue=asyncio.Queue(maxsize=5)# 创建生产者和消费者producers=[asyncio.create_task(producer(queue,i))foriinrange(2)]consumers=[asyncio.create_task(consumer(queue,i))foriinrange(3)]# 等待所有生产者完成awaitasyncio.gather(*producers)# 等待队列中的所有项目被消费awaitqueue.join()# 取消消费者任务forcinconsumers:c.cancel()asyncio.run(main())

5. 实际应用示例

示例6:并发HTTP请求

importaiohttpimportasyncioimporttimeasyncdeffetch_url(session,url):try:asyncwithsession.get(url,timeout=10)asresponse:data=awaitresponse.text()returnf"{url}: 状态码{response.status}, 长度{len(data)}"exceptExceptionase:returnf"{url}: 错误 -{str(e)}"asyncdefmain():urls=["https://httpbin.org/get","https://httpbin.org/delay/2",# 延迟2秒的端点"https://httpbin.org/status/404","https://httpbin.org/status/500"]start_time=time.time()asyncwithaiohttp.ClientSession()assession:tasks=[fetch_url(session,url)forurlinurls]results=awaitasyncio.gather(*tasks)forresultinresults:print(result)end_time=time.time()print(f"\n总耗时:{end_time-start_time:.2f}秒")asyncio.run(main())

示例7:带超时控制的协程

importasyncioasyncdeflong_running_task(task_id,seconds):print(f"任务{task_id}开始")awaitasyncio.sleep(seconds)print(f"任务{task_id}完成")returnf"任务{task_id}结果"asyncdefmain():try:# 设置超时时间为3秒result=awaitasyncio.wait_for(long_running_task(1,5),timeout=3)print(f"成功:{result}")exceptasyncio.TimeoutError:print("任务超时!")# 使用asyncio.shield防止取消try:task=asyncio.create_task(long_running_task(2,4))# shield保护任务不被取消result=awaitasyncio.wait_for(asyncio.shield(task),timeout=2)print(f"shield结果:{result}")exceptasyncio.TimeoutError:print("外部超时,但任务继续运行")# 等待被shield的任务完成result=awaittaskprint(f"被shield的任务最终完成:{result}")asyncio.run(main())

6. 协程模式示例

示例8:生产者-消费者模式

importasyncioimportrandomasyncdefproducer(name,queue):foriinrange(5):item=f"item-{name}-{i}"awaitasyncio.sleep(random.uniform(0.1,0.5))awaitqueue.put(item)print(f"{name}生产了{item}")asyncdefconsumer(name,queue):whileTrue:try:# 设置超时避免永远等待item=awaitasyncio.wait_for(queue.get(),timeout=2)awaitasyncio.sleep(random.uniform(0.2,0.8))# 模拟处理时间print(f"{name}处理了{item}")queue.task_done()exceptasyncio.TimeoutError:print(f"{name}等待超时,退出")breakasyncdefmain():queue=asyncio.Queue(maxsize=3)# 创建生产者和消费者producers=[asyncio.create_task(producer(f"P{i}",queue))foriinrange(2)]consumers=[asyncio.create_task(consumer(f"C{i}",queue))foriinrange(3)]# 等待所有生产者完成awaitasyncio.gather(*producers)# 等待队列清空awaitqueue.join()# 取消消费者forcinconsumers:c.cancel()print("所有任务完成")asyncio.run(main())

关键要点总结

  1. 定义协程: 使用async def定义协程函数
  2. 调用协程: 使用await调用其他协程
  3. 运行协程: 使用asyncio.run()运行顶级协程
  4. 并发执行: 使用asyncio.gather()asyncio.create_task()实现并发
  5. 协程通信: 使用asyncio.Queue进行协程间通信
  6. 错误处理: 协程中可以使用常规的try/except处理异常
  7. 超时控制: 使用asyncio.wait_for()设置超时

协程适合I/O密集型应用,如网络请求、文件读写等场景,可以显著提高程序的并发性能。

http://www.cnnetsun.cn/news/4338.html

相关文章:

  • Python+appium自动化测试
  • 【完整源码+数据集+部署教程】电子元件目标检测检测系统源码[一条龙教学YOLOV8标注好的数据集一键训练_70+全套改进创新点发刊_Web前端展示]
  • Android 基础入门教程​​ LinearLayout(线性布局)分类
  • 笔记本电脑全内置式水冷散热设计
  • CefSharp实战宝典:从零构建高性能嵌入式浏览器应用
  • 免费无广!这款 PDF 橡皮擦,功能非常强大,办公党必藏!
  • 快速上手verl全流程实战指南:如何避开大模型强化学习配置陷阱?
  • 海外红人营销如何提升美妆转化?从认知到决策的全链路解析
  • Wan2.2-T2V-A14B在航空时刻表宣传视频中的航班动态模拟
  • PHP 8.6即将改变游戏规则:协程调度优化全曝光
  • Wan2.2-T2V-A14B模型对量子物理概念可视化的挑战应对
  • 从理论到实践:C#与Python协同开发量子算法的3步极速入门法
  • 【临床数据生存分析实战指南】:掌握R语言Cox模型构建与解读精髓
  • 云资产查询革命:用SQL统一管理多云环境的终极方案
  • 从实验室到生产环境,C#量子ML部署全流程解析,90%工程师都忽略了第4步
  • 视频汇聚平台EasyCVR赋能校园周界防范构建全时段安全防线
  • Python-Wechaty实战:3步构建基于PadLocal协议的微信机器人
  • 智能家居实战:基于Johnny-Five的自动感应垃圾桶开发全解析
  • 36、邮件服务搭建与配置全攻略
  • 37、Red Hat系统管理:邮件服务、故障排查与虚拟化指南
  • 构建现代化Python桌面应用:pywebview与前端框架的完美融合
  • 为什么越来越多工程师选择 ARMxy 做 EtherCAT 主站?答案在这里
  • 千匠大宗商品电商系统:以全链路数字化重构大宗交易生态
  • 网络安全应急响应:PDCERF模型从入门到精通+3大高频场景处置方案
  • 终极指南:5分钟快速部署PLabel半自动标注系统
  • 使用TRL库实现GRPO强化学习算法详解
  • Wan2.2-T2V-A14B如何精准还原‘夕阳下的海浪翻滚’场景
  • 快速搭建专业级屏幕共享服务:screego/server实战指南
  • ScottPlot 实时数据可视化:新手完整入门指南与性能优化技巧
  • 当普通显卡也能拍电影:Wan2.1如何重塑视频创作生态