Pythonasync迭代器与生成器
=================================================================
Python 异步迭代器与生成器:异步数据流处理
=================================================================
异步迭代器和异步生成器是 asyncio 生态中的重要组成部分。
它们允许在数据生产或消费过程中进行异步等待,非常适合
处理流式数据、网络请求等场景。
=================================================================
一、异步迭代器:__aiter__ 和 __anext__
=================================================================
import asyncio
import random
from typing import AsyncIterator
class AsyncCounter:
"""
异步计数器迭代器。
实现了 __aiter__ 和 __anext__ 协议。
"""
def __init__(self, start: int, end: int, delay: float = 0.5):
self.current = start
self.end = end
self.delay = delay
def __aiter__(self):
"""返回异步迭代器对象本身"""
print("异步迭代器已创建")
return self
async def __anext__(self):
"""
返回下一个值。
当没有更多值时,抛出 StopAsyncIteration 异常。
"""
if self.current >= self.end:
print("迭代结束")
raise StopAsyncIteration
# 模拟异步操作(如网络请求、数据库查询)
await asyncio.sleep(self.delay)
value = self.current
self.current += 1
return value
async def async_iterator_demo():
"""使用异步迭代器的完整示例"""
print("=== 异步迭代器演示 ===")
async for number in AsyncCounter(1, 5, delay=0.3):
print(f"当前数字: {number}")
# async for 等价于:
# 1. 调用 __aiter__() 获取迭代器
# 2. 循环调用 await __anext__() 获取值
# 3. 捕获 StopAsyncIteration 时退出
=================================================================
二、异步生成器:async def + yield
=================================================================
"""
异步生成器使用 async def 定义,内部使用 yield。
它在每次 yield 之间可以执行 await 操作。
"""
async def async_range(start: int, end: int, delay: float = 0.5):
"""
异步版本的 range() 生成器。
每次迭代之间有延迟,模拟异步数据生产。
"""
print(f"异步生成器开始: [{start}, {end})")
for i in range(start, end):
# 在 yield 之前可以执行异步操作
await asyncio.sleep(delay)
print(f"生成: {i}")
yield i # 产出值并暂停执行
print("异步生成器结束")
async def async_generator_demo():
"""使用异步生成器的示例"""
print("=== 异步生成器演示 ===")
async for value in async_range(1, 4, delay=0.2):
print(f"消费: {value}")
# 异步生成器底层原理:
# 1. 调用 async_range() 返回一个异步生成器对象
# 2. async for 每次迭代调用 __anext__()
# 3. 生成器执行到 yield 时暂停并返回值
# 4. 下次迭代从 yield 处继续执行
=================================================================
三、@asynccontextmanager:异步上下文管理器
=================================================================
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_file_simulation(filename: str):
"""
模拟异步文件操作的上下文管理器。
@asynccontextmanager 可以将异步生成器转换为
异步上下文管理器(支持 async with)。
"""
print(f"打开文件: {filename}")
# 模拟异步打开操作
await asyncio.sleep(0.1)
try:
# yield 之前是 __aenter__ 部分
yield {"name": filename, "content": "模拟文件内容"}
# yield 之后是 __aexit__ 部分(正常退出时执行)
print(f"正常关闭文件: {filename}")
except Exception as e:
# 异常时的清理逻辑
print(f"异常关闭文件: {filename}, 错误: {e}")
raise
finally:
# 无论如何都会执行的清理
print(f"最终清理: {filename}")
async def async_context_manager_demo():
"""使用异步上下文管理器"""
print("=== 异步上下文管理器演示 ===")
async with async_file_simulation("data.txt") as f:
print(f"读取内容: {f['content']}")
# 离开 async with 块时自动执行清理
# 也支持异常处理
try:
async with async_file_simulation("error.txt") as f:
raise ValueError("模拟错误")
except ValueError:
print("异常已被捕获")
=================================================================
四、异步迭代器 vs 异步生成器
=================================================================
"""
=== 异步迭代器 ===
- 需要手动实现 __aiter__ 和 __anext__
- 可以维护复杂的状态机
- 适合需要自定义迭代逻辑的场景
- 可以实现可重用的迭代器类
=== 异步生成器 ===
- 使用 async def + yield 自动实现
- 代码更简洁
- 适合简单的数据流转换
- 每次迭代只能使用一次(不能重置)
=== 选择指南 ===
- 简单的数据流变换: 异步生成器
- 复杂的迭代状态管理: 异步迭代器
- 需要多次遍历的数据: 异步迭代器(支持重新创建)
- 流式数据转换(map/filter):异步生成器
"""
class AsyncMessageStream:
"""异步迭代器:模拟消息流"""
def __init__(self, messages: list[str], delay: float = 0.1):
self.messages = messages
self.delay = delay
self.index = 0
def __aiter__(self):
self.index = 0 # 重置状态,支持多次遍历
return self
async def __anext__(self):
if self.index >= len(self.messages):
raise StopAsyncIteration
await asyncio.sleep(self.delay)
msg = self.messages[self.index]
self.index += 1
return f"消息: {msg}"
async def message_gen():
"""异步生成器:功能相同但代码更简洁"""
messages = ["A", "B", "C"]
for msg in messages:
await asyncio.sleep(0.1)
yield f"消息: {msg}"
=================================================================
五、async for 循环与异步列表推导式
=================================================================
async def async_comprehension_demo():
"""异步推导式的各种用法"""
print("=== 异步推导式演示 ===")
# 1. 异步列表推导式
squares = [x * x async for x in async_range(1, 5, 0.1)]
print(f"异步列表推导式: {squares}")
# 2. 带条件的异步推导式
evens = [x async for x in async_range(1, 6, 0.1) if x % 2 == 0]
print(f"偶数筛选: {evens}")
# 3. 异步集合推导式
async_set = {x async for x in async_range(1, 5, 0.1)}
print(f"异步集合推导式: {async_set}")
# 4. 异步字典推导式
async_dict = {f"key-{x}": x async for x in async_range(1, 4, 0.1)}
print(f"异步字典推导式: {async_dict}")
# 5. 异步生成器表达式
async_gen = (x * 2 async for x in async_range(1, 4, 0.1))
async for value in async_gen:
print(f"生成器表达式: {value}")
# 注意:Python 支持在列表/集合/字典推导式的最外层
# 使用 async for,但必须在异步函数内部
=================================================================
六、异步 map / filter 模式
=================================================================
class AsyncTransform:
"""异步数据流转换工具"""
@staticmethod
async def amap(func, async_iter):
"""
异步 map:对异步迭代器的每个元素应用函数。
类似于内置的 map(),但支持异步函数。
"""
async for item in async_iter:
result = await func(item)
yield result
@staticmethod
async def afilter(predicate, async_iter):
"""
异步 filter:过滤异步迭代器的元素。
类似于内置的 filter(),但支持异步谓词。
"""
async for item in async_iter:
if await predicate(item):
yield item
@staticmethod
async def areduce(func, async_iter, initial=None):
"""
异步 reduce:累积异步迭代器的元素。
类似于 functools.reduce()。
"""
accum = initial
async for item in async_iter:
if accum is None:
accum = item
else:
accum = await func(accum, item)
return accum
async def async_map_filter_demo():
"""演示异步 map/filter/reduce 模式"""
# 数据源:异步生成器
async def data_source():
for i in range(1, 6):
await asyncio.sleep(0.05)
yield i
transform = AsyncTransform()
# 异步 map:每个元素乘以 2
async def double(x):
await asyncio.sleep(0.02)
return x * 2
print("异步 map 结果:")
async for val in transform.amap(double, data_source()):
print(f" {val}")
# 异步 filter:筛选偶数
async def is_even(x):
await asyncio.sleep(0.02)
return x % 2 == 0
print("异步 filter 结果(偶数):")
async for val in transform.afilter(is_even, data_source()):
print(f" {val}")
# 异步 reduce:求和
async def add(x, y):
return x + y
total = await transform.areduce(
add, data_source(), initial=0
)
print(f"异步 reduce 求和: {total}")
=================================================================
七、实用示例:异步流式 HTTP 客户端
=================================================================
async def fetch_and_process(url: str, chunk_size: int = 256):
"""
流式获取 HTTP 响应并逐块处理。
使用异步生成器实现流式数据处理。
"""
try:
reader, writer = await asyncio.open_connection(url, 80)
request = (
f"GET / HTTP/1.1\r\n"
f"Host: {url}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
writer.write(request.encode())
await writer.drain()
# 流式读取响应体
while True:
chunk = await reader.read(chunk_size)
if not chunk:
break
yield chunk # 每次产生一块数据
await asyncio.sleep(0) # 让出事件循环
writer.close()
await writer.wait_closed()
except Exception as e:
print(f"获取 {url} 失败: {e}")
async def stream_processor():
"""流式处理 HTTP 响应"""
async for chunk in fetch_and_process("example.com"):
print(f"收到数据块: {len(chunk)} 字节")
=================================================================
八、总结
=================================================================
# 1. __aiter__ / __anext__ 实现异步迭代器协议
# 2. async def + yield 定义异步生成器
# 3. @asynccontextmanager 结合生成器实现异步上下文管理
# 4. 异步推导式(列表/集合/字典)简化异步数据转换
# 5. 异步 map/filter/reduce 构建数据处理流水线
# 6. 异步生成器天然适合流式数据场景
# 7. StopAsyncIteration 标记迭代结束
# 8. async for 是消费异步可迭代对象的标准方式
if __name__ == "__main__":
asyncio.run(async_comprehension_demo())
