当前位置: 首页 > news >正文

Pythonasync迭代器与生成器

=================================================================
Python 异步迭代器与生成器:异步数据流处理
=================================================================

异步迭代器和异步生成器是 asyncio 生态中的重要组成部分。
它们允许在数据生产或消费过程中进行异步等待,非常适合
处理流式数据、网络请求等场景。

=================================================================
一、异步迭代器:__aiter__ 和 __anext__
=================================================================

import asyncio
import random
from typing import AsyncIterator

class AsyncCounter:
"""
异步计数器迭代器。
实现了 __aiter__ 和 __anext__ 协议。
"""

def __init__(self, start: int, end: int, delay: float = 0.5):
self.current = start
self.end = end
self.delay = delay

def __aiter__(self):
"""返回异步迭代器对象本身"""
print("异步迭代器已创建")
return self

async def __anext__(self):
"""
返回下一个值。
当没有更多值时,抛出 StopAsyncIteration 异常。
"""
if self.current >= self.end:
print("迭代结束")
raise StopAsyncIteration

# 模拟异步操作(如网络请求、数据库查询)
await asyncio.sleep(self.delay)
value = self.current
self.current += 1
return value

async def async_iterator_demo():
"""使用异步迭代器的完整示例"""
print("=== 异步迭代器演示 ===")
async for number in AsyncCounter(1, 5, delay=0.3):
print(f"当前数字: {number}")
# async for 等价于:
# 1. 调用 __aiter__() 获取迭代器
# 2. 循环调用 await __anext__() 获取值
# 3. 捕获 StopAsyncIteration 时退出

=================================================================
二、异步生成器:async def + yield
=================================================================

"""
异步生成器使用 async def 定义,内部使用 yield。
它在每次 yield 之间可以执行 await 操作。
"""

async def async_range(start: int, end: int, delay: float = 0.5):
"""
异步版本的 range() 生成器。
每次迭代之间有延迟,模拟异步数据生产。
"""
print(f"异步生成器开始: [{start}, {end})")
for i in range(start, end):
# 在 yield 之前可以执行异步操作
await asyncio.sleep(delay)
print(f"生成: {i}")
yield i # 产出值并暂停执行
print("异步生成器结束")

async def async_generator_demo():
"""使用异步生成器的示例"""
print("=== 异步生成器演示 ===")
async for value in async_range(1, 4, delay=0.2):
print(f"消费: {value}")

# 异步生成器底层原理:
# 1. 调用 async_range() 返回一个异步生成器对象
# 2. async for 每次迭代调用 __anext__()
# 3. 生成器执行到 yield 时暂停并返回值
# 4. 下次迭代从 yield 处继续执行

=================================================================
三、@asynccontextmanager:异步上下文管理器
=================================================================

from contextlib import asynccontextmanager

@asynccontextmanager
async def async_file_simulation(filename: str):
"""
模拟异步文件操作的上下文管理器。
@asynccontextmanager 可以将异步生成器转换为
异步上下文管理器(支持 async with)。
"""
print(f"打开文件: {filename}")
# 模拟异步打开操作
await asyncio.sleep(0.1)

try:
# yield 之前是 __aenter__ 部分
yield {"name": filename, "content": "模拟文件内容"}
# yield 之后是 __aexit__ 部分(正常退出时执行)
print(f"正常关闭文件: {filename}")
except Exception as e:
# 异常时的清理逻辑
print(f"异常关闭文件: {filename}, 错误: {e}")
raise
finally:
# 无论如何都会执行的清理
print(f"最终清理: {filename}")

async def async_context_manager_demo():
"""使用异步上下文管理器"""
print("=== 异步上下文管理器演示 ===")
async with async_file_simulation("data.txt") as f:
print(f"读取内容: {f['content']}")
# 离开 async with 块时自动执行清理

# 也支持异常处理
try:
async with async_file_simulation("error.txt") as f:
raise ValueError("模拟错误")
except ValueError:
print("异常已被捕获")

=================================================================
四、异步迭代器 vs 异步生成器
=================================================================

"""
=== 异步迭代器 ===
- 需要手动实现 __aiter__ 和 __anext__
- 可以维护复杂的状态机
- 适合需要自定义迭代逻辑的场景
- 可以实现可重用的迭代器类

=== 异步生成器 ===
- 使用 async def + yield 自动实现
- 代码更简洁
- 适合简单的数据流转换
- 每次迭代只能使用一次(不能重置)

=== 选择指南 ===
- 简单的数据流变换: 异步生成器
- 复杂的迭代状态管理: 异步迭代器
- 需要多次遍历的数据: 异步迭代器(支持重新创建)
- 流式数据转换(map/filter):异步生成器
"""

class AsyncMessageStream:
"""异步迭代器:模拟消息流"""
def __init__(self, messages: list[str], delay: float = 0.1):
self.messages = messages
self.delay = delay
self.index = 0

def __aiter__(self):
self.index = 0 # 重置状态,支持多次遍历
return self

async def __anext__(self):
if self.index >= len(self.messages):
raise StopAsyncIteration
await asyncio.sleep(self.delay)
msg = self.messages[self.index]
self.index += 1
return f"消息: {msg}"

async def message_gen():
"""异步生成器:功能相同但代码更简洁"""
messages = ["A", "B", "C"]
for msg in messages:
await asyncio.sleep(0.1)
yield f"消息: {msg}"

=================================================================
五、async for 循环与异步列表推导式
=================================================================

async def async_comprehension_demo():
"""异步推导式的各种用法"""
print("=== 异步推导式演示 ===")

# 1. 异步列表推导式
squares = [x * x async for x in async_range(1, 5, 0.1)]
print(f"异步列表推导式: {squares}")

# 2. 带条件的异步推导式
evens = [x async for x in async_range(1, 6, 0.1) if x % 2 == 0]
print(f"偶数筛选: {evens}")

# 3. 异步集合推导式
async_set = {x async for x in async_range(1, 5, 0.1)}
print(f"异步集合推导式: {async_set}")

# 4. 异步字典推导式
async_dict = {f"key-{x}": x async for x in async_range(1, 4, 0.1)}
print(f"异步字典推导式: {async_dict}")

# 5. 异步生成器表达式
async_gen = (x * 2 async for x in async_range(1, 4, 0.1))
async for value in async_gen:
print(f"生成器表达式: {value}")

# 注意:Python 支持在列表/集合/字典推导式的最外层
# 使用 async for,但必须在异步函数内部

=================================================================
六、异步 map / filter 模式
=================================================================

class AsyncTransform:
"""异步数据流转换工具"""

@staticmethod
async def amap(func, async_iter):
"""
异步 map:对异步迭代器的每个元素应用函数。
类似于内置的 map(),但支持异步函数。
"""
async for item in async_iter:
result = await func(item)
yield result

@staticmethod
async def afilter(predicate, async_iter):
"""
异步 filter:过滤异步迭代器的元素。
类似于内置的 filter(),但支持异步谓词。
"""
async for item in async_iter:
if await predicate(item):
yield item

@staticmethod
async def areduce(func, async_iter, initial=None):
"""
异步 reduce:累积异步迭代器的元素。
类似于 functools.reduce()。
"""
accum = initial
async for item in async_iter:
if accum is None:
accum = item
else:
accum = await func(accum, item)
return accum

async def async_map_filter_demo():
"""演示异步 map/filter/reduce 模式"""
# 数据源:异步生成器
async def data_source():
for i in range(1, 6):
await asyncio.sleep(0.05)
yield i

transform = AsyncTransform()

# 异步 map:每个元素乘以 2
async def double(x):
await asyncio.sleep(0.02)
return x * 2

print("异步 map 结果:")
async for val in transform.amap(double, data_source()):
print(f" {val}")

# 异步 filter:筛选偶数
async def is_even(x):
await asyncio.sleep(0.02)
return x % 2 == 0

print("异步 filter 结果(偶数):")
async for val in transform.afilter(is_even, data_source()):
print(f" {val}")

# 异步 reduce:求和
async def add(x, y):
return x + y

total = await transform.areduce(
add, data_source(), initial=0
)
print(f"异步 reduce 求和: {total}")

=================================================================
七、实用示例:异步流式 HTTP 客户端
=================================================================

async def fetch_and_process(url: str, chunk_size: int = 256):
"""
流式获取 HTTP 响应并逐块处理。
使用异步生成器实现流式数据处理。
"""
try:
reader, writer = await asyncio.open_connection(url, 80)
request = (
f"GET / HTTP/1.1\r\n"
f"Host: {url}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
writer.write(request.encode())
await writer.drain()

# 流式读取响应体
while True:
chunk = await reader.read(chunk_size)
if not chunk:
break
yield chunk # 每次产生一块数据
await asyncio.sleep(0) # 让出事件循环

writer.close()
await writer.wait_closed()
except Exception as e:
print(f"获取 {url} 失败: {e}")

async def stream_processor():
"""流式处理 HTTP 响应"""
async for chunk in fetch_and_process("example.com"):
print(f"收到数据块: {len(chunk)} 字节")

=================================================================
八、总结
=================================================================

# 1. __aiter__ / __anext__ 实现异步迭代器协议
# 2. async def + yield 定义异步生成器
# 3. @asynccontextmanager 结合生成器实现异步上下文管理
# 4. 异步推导式(列表/集合/字典)简化异步数据转换
# 5. 异步 map/filter/reduce 构建数据处理流水线
# 6. 异步生成器天然适合流式数据场景
# 7. StopAsyncIteration 标记迭代结束
# 8. async for 是消费异步可迭代对象的标准方式

if __name__ == "__main__":
asyncio.run(async_comprehension_demo())

http://www.cnnetsun.cn/news/2633518.html

相关文章:

  • 55项功能全面增强!HsMod终极炉石传说插件让游戏体验飞跃升级
  • TMS320F28377D实战:巧用EPWM触发DMA驱动DAC,实现高频波形生成的避坑指南
  • 【Google AI团队内部简报首发】:Gemini 2.5 Pro核心能力拆解,92%企业尚未启用的关键功能
  • MAA异常处理终极指南:从症状识别到深度优化的完整解决方案
  • Matlab帧间差分运动检测实战包:含测试视频ccbr1.avi、主脚本tracking.m与调用示例ex1.m
  • 空洞骑士模组管理革命:Scarab如何让复杂变简单
  • 隧道爆破振动数据降噪工具包:CEEMDAN自适应分解+小波包阈值精修
  • Win10系统内置应用集体‘罢工’?可能是你的用户配置文件(NTUSER.DAT)坏了,试试这个修复流程
  • html制作的PPT(各种风格)提示词
  • 为什么你的Gemini翻译在西班牙语合同场景错误率达34%?:三步定位语义漂移+文化适配失效根因
  • 3分钟搞定Windows任务栏透明化:TranslucentTB依赖问题终极解决指南
  • 国产大数据平台DataSophon初体验:手把手教你用4台虚拟机搭建Hadoop+Hive集群
  • 杰理之耳机低延时配置问题【篇】
  • 中文在线:AI短剧年化产能有望达3000部,亏损困局下赴港募资突围前景待察
  • RePKG:5分钟上手!轻松提取Wallpaper Engine壁纸资源的完整指南
  • 高漂瓶新手入门教程:三分钟学会投递铁轨浪漫
  • G-Helper深度解析:华硕笔记本性能调优完整指南
  • 5分钟搞定游戏模组:BepInEx框架终极安装配置指南
  • 2026 内容分发自动化实战:一套流程跑多平台,验证码交给人工接管
  • 免费Mac工具QMCDecode:三步快速解密QQ音乐加密格式的终极指南
  • 智能家居的‘感觉’从哪来?聊聊模糊推理在温控与照明中的实战应用
  • 2026年重庆精密无缝钢管定做 行业厂家经验分享
  • Rhea框架:多核SoC缓存一致性设计与验证的革命性工具
  • Tabby终端美化与效率提升指南:从主题配色到自定义快捷键设置
  • 游戏寻路算法实战:A*、Dijkstra和BFS,Unity里到底该用哪个?
  • 硕士毕业答辩PPT分享
  • 3个维度解析:如何重新定义你的NCM音乐文件自由
  • 大模型 API 调用成本太高?3 个步骤把账单降下来 30%
  • NVIDIA Profile Inspector终极指南:10个技巧解锁显卡隐藏性能
  • 基于Shape Up方法论与LLM构建智能会议决策系统:从信息摘要到战略塑形