【缓存技术】Redis实战:从缓存策略到分布式锁
【缓存技术】Redis实战:从缓存策略到分布式锁
引言
缓存是提升系统性能的关键技术,Redis作为高性能的键值存储系统,被广泛应用于缓存、会话管理、消息队列等场景。本文将详细介绍Redis的核心特性、缓存策略和分布式锁实现。
一、Redis基础
1.1 数据结构
| 数据类型 | 说明 | 应用场景 |
|---|---|---|
| String | 字符串 | 缓存、计数器 |
| Hash | 哈希表 | 对象存储 |
| List | 列表 | 消息队列 |
| Set | 无序集合 | 去重、交集 |
| Sorted Set | 有序集合 | 排行榜 |
1.2 基本操作
import redis # 连接Redis r = redis.Redis(host='localhost', port=6379, db=0) # String操作 r.set('name', 'Alice') print(r.get('name')) # Hash操作 r.hset('user:1', mapping={ 'name': 'Alice', 'age': 30, 'email': 'alice@example.com' }) print(r.hgetall('user:1')) # List操作 r.lpush('messages', 'Hello') r.lpush('messages', 'World') print(r.lrange('messages', 0, -1)) # Set操作 r.sadd('tags', 'python', 'redis', 'web') print(r.smembers('tags')) # Sorted Set操作 r.zadd('leaderboard', {'Alice': 100, 'Bob': 90}) print(r.zrange('leaderboard', 0, -1, withscores=True))二、缓存策略
2.1 缓存模式
# Cache-Aside模式 def get_user(user_id): # 先从缓存获取 user = r.get(f'user:{user_id}') if user: return json.loads(user) # 缓存未命中,从数据库获取 user = db.query(f'SELECT * FROM users WHERE id = {user_id}') # 更新缓存 r.set(f'user:{user_id}', json.dumps(user), ex=3600) return user2.2 缓存淘汰策略
# 配置Redis缓存策略 # maxmemory-policy allkeys-lru # LRU(最近最少使用) # LFU(最不经常使用) # FIFO(先进先出) # 设置缓存过期时间 r.set('temp_data', 'value', ex=60) # 60秒过期 r.set('session:123', 'data', px=3600000) # 毫秒过期2.3 缓存一致性
# 写操作时更新缓存 def update_user(user_id, data): # 更新数据库 db.execute(f'UPDATE users SET ... WHERE id = {user_id}') # 删除缓存(让下次读取时重新加载) r.delete(f'user:{user_id}') # 或者更新缓存 # r.set(f'user:{user_id}', json.dumps(data))三、分布式锁
3.1 基本实现
def acquire_lock(lock_name, acquire_timeout=10): """获取分布式锁""" identifier = str(uuid.uuid4()) end = time.time() + acquire_timeout while time.time() < end: # 使用SET NX(不存在时设置) if r.set(f'lock:{lock_name}', identifier, nx=True, ex=10): return identifier time.sleep(0.01) return None def release_lock(lock_name, identifier): """释放分布式锁""" # 使用Lua脚本保证原子性 script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ r.eval(script, 1, f'lock:{lock_name}', identifier)3.2 Redlock算法
from redis import Redis from time import time class Redlock: def __init__(self, servers): self.servers = [Redis(host=host, port=port) for host, port in servers] self.quorum = (len(servers) // 2) + 1 def acquire(self, lock_name, ttl=10000): """获取Redlock分布式锁""" identifier = str(uuid.uuid4()) acquired = 0 start_time = time() * 1000 for server in self.servers: try: if server.set(f'lock:{lock_name}', identifier, nx=True, px=ttl): acquired += 1 except Exception: pass elapsed = (time() * 1000) - start_time if acquired >= self.quorum and elapsed < ttl: return { 'valid': True, 'identifier': identifier, 'ttl': ttl - elapsed } # 获取失败,释放已获取的锁 self.release(lock_name, identifier) return {'valid': False} def release(self, lock_name, identifier): """释放Redlock分布式锁""" for server in self.servers: try: script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) end """ server.eval(script, 1, f'lock:{lock_name}', identifier) except Exception: pass四、高级特性
4.1 管道操作
# 使用管道提升性能 pipe = r.pipeline() # 添加多个操作 pipe.set('key1', 'value1') pipe.set('key2', 'value2') pipe.get('key1') pipe.get('key2') # 一次性执行 results = pipe.execute() print(results)4.2 事务
# 使用事务 with r.pipeline() as pipe: while True: try: pipe.watch('balance') current = int(pipe.get('balance')) if current >= 100: pipe.multi() pipe.decrby('balance', 100) pipe.incr('withdrawn') pipe.execute() break else: break except redis.WatchError: continue4.3 发布/订阅
# 发布者 pubsub = r.pubsub() r.publish('news', 'Hello World') # 订阅者 pubsub.subscribe('news') for message in pubsub.listen(): if message['type'] == 'message': print(f"Received: {message['data']}")五、性能优化
5.1 连接池
from redis import ConnectionPool # 创建连接池 pool = ConnectionPool( host='localhost', port=6379, db=0, max_connections=100 ) # 使用连接池 r = redis.Redis(connection_pool=pool)5.2 数据分片
# 简单的分片策略 def get_shard(key): """根据key计算分片""" shards = [0, 1, 2] return shards[hash(key) % len(shards)] def get_from_shard(key): shard = get_shard(key) r = redis.Redis(db=shard) return r.get(key)5.3 批量操作
# 批量获取 keys = ['user:1', 'user:2', 'user:3'] values = r.mget(keys) # 批量设置 mapping = { 'user:1': 'data1', 'user:2': 'data2' } r.mset(mapping)六、监控与运维
6.1 监控指标
# 获取Redis信息 info = r.info() print(f"内存使用: {info['used_memory_human']}") print(f"连接数: {info['connected_clients']}") print(f"命中率: {info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses']):.2%}")6.2 持久化配置
# RDB持久化 # save 900 1 # 900秒内至少1个key变化 # save 300 10 # 300秒内至少10个key变化 # AOF持久化 # appendonly yes # appendfsync everysec6.3 集群配置
# Redis Cluster配置 from redis.cluster import RedisCluster rc = RedisCluster( host='localhost', port=7000, decode_responses=True ) rc.set('foo', 'bar') print(rc.get('foo'))七、实战案例:缓存系统
7.1 实现缓存装饰器
def cache(key_prefix, ttl=3600): def decorator(func): def wrapper(*args, **kwargs): # 生成缓存key key = f"{key_prefix}:{args[0]}" # 尝试从缓存获取 cached = r.get(key) if cached: return json.loads(cached) # 执行函数 result = func(*args, **kwargs) # 缓存结果 r.set(key, json.dumps(result), ex=ttl) return result return wrapper return decorator @cache('user') def get_user(user_id): return db.query(f'SELECT * FROM users WHERE id = {user_id}')7.2 限流实现
def rate_limit(user_id, limit=100, window=3600): """限流实现""" key = f'rate_limit:{user_id}' # 增加计数 count = r.incr(key) if count == 1: # 设置过期时间 r.expire(key, window) return count <= limit八、常见问题与解决方案
8.1 缓存穿透
# 使用布隆过滤器解决缓存穿透 from bloom_filter import BloomFilter bloom = BloomFilter(max_elements=1000000, error_rate=0.01) def get_user(user_id): # 先检查布隆过滤器 if user_id not in bloom: return None # 从缓存获取 user = r.get(f'user:{user_id}') if user: return json.loads(user) # 从数据库获取 user = db.query(f'SELECT * FROM users WHERE id = {user_id}') if user: bloom.add(user_id) r.set(f'user:{user_id}', json.dumps(user)) return user8.2 缓存击穿
# 使用互斥锁解决缓存击穿 def get_hot_data(key): # 先尝试获取缓存 data = r.get(key) if data: return json.loads(data) # 获取分布式锁 lock = acquire_lock(f'lock:{key}') if not lock: # 获取锁失败,等待重试 time.sleep(0.1) return get_hot_data(key) try: # 再次检查缓存 data = r.get(key) if data: return json.loads(data) # 从数据库加载 data = db.query(f'SELECT * FROM hot_data WHERE key = "{key}"') # 更新缓存 r.set(key, json.dumps(data), ex=3600) return data finally: release_lock(f'lock:{key}', lock)8.3 缓存雪崩
# 设置随机过期时间避免缓存雪崩 import random def set_cache_with_random_ttl(key, value, base_ttl=3600): # 添加随机偏移 ttl = base_ttl + random.randint(0, 300) r.set(key, value, ex=ttl)九、结语
Redis是一个功能强大的缓存和数据存储系统,掌握其核心特性和最佳实践对于构建高性能系统至关重要。本文介绍了Redis的基础操作、缓存策略、分布式锁和性能优化等内容,希望能帮助你更好地使用Redis。
#Redis #缓存 #分布式锁 #性能优化
