当前位置: 首页 > news >正文

【缓存技术】Redis实战:从缓存策略到分布式锁

【缓存技术】Redis实战:从缓存策略到分布式锁

引言

缓存是提升系统性能的关键技术,Redis作为高性能的键值存储系统,被广泛应用于缓存、会话管理、消息队列等场景。本文将详细介绍Redis的核心特性、缓存策略和分布式锁实现。

一、Redis基础

1.1 数据结构

数据类型说明应用场景
String字符串缓存、计数器
Hash哈希表对象存储
List列表消息队列
Set无序集合去重、交集
Sorted Set有序集合排行榜

1.2 基本操作

import redis # 连接Redis r = redis.Redis(host='localhost', port=6379, db=0) # String操作 r.set('name', 'Alice') print(r.get('name')) # Hash操作 r.hset('user:1', mapping={ 'name': 'Alice', 'age': 30, 'email': 'alice@example.com' }) print(r.hgetall('user:1')) # List操作 r.lpush('messages', 'Hello') r.lpush('messages', 'World') print(r.lrange('messages', 0, -1)) # Set操作 r.sadd('tags', 'python', 'redis', 'web') print(r.smembers('tags')) # Sorted Set操作 r.zadd('leaderboard', {'Alice': 100, 'Bob': 90}) print(r.zrange('leaderboard', 0, -1, withscores=True))

二、缓存策略

2.1 缓存模式

# Cache-Aside模式 def get_user(user_id): # 先从缓存获取 user = r.get(f'user:{user_id}') if user: return json.loads(user) # 缓存未命中,从数据库获取 user = db.query(f'SELECT * FROM users WHERE id = {user_id}') # 更新缓存 r.set(f'user:{user_id}', json.dumps(user), ex=3600) return user

2.2 缓存淘汰策略

# 配置Redis缓存策略 # maxmemory-policy allkeys-lru # LRU(最近最少使用) # LFU(最不经常使用) # FIFO(先进先出) # 设置缓存过期时间 r.set('temp_data', 'value', ex=60) # 60秒过期 r.set('session:123', 'data', px=3600000) # 毫秒过期

2.3 缓存一致性

# 写操作时更新缓存 def update_user(user_id, data): # 更新数据库 db.execute(f'UPDATE users SET ... WHERE id = {user_id}') # 删除缓存(让下次读取时重新加载) r.delete(f'user:{user_id}') # 或者更新缓存 # r.set(f'user:{user_id}', json.dumps(data))

三、分布式锁

3.1 基本实现

def acquire_lock(lock_name, acquire_timeout=10): """获取分布式锁""" identifier = str(uuid.uuid4()) end = time.time() + acquire_timeout while time.time() < end: # 使用SET NX(不存在时设置) if r.set(f'lock:{lock_name}', identifier, nx=True, ex=10): return identifier time.sleep(0.01) return None def release_lock(lock_name, identifier): """释放分布式锁""" # 使用Lua脚本保证原子性 script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ r.eval(script, 1, f'lock:{lock_name}', identifier)

3.2 Redlock算法

from redis import Redis from time import time class Redlock: def __init__(self, servers): self.servers = [Redis(host=host, port=port) for host, port in servers] self.quorum = (len(servers) // 2) + 1 def acquire(self, lock_name, ttl=10000): """获取Redlock分布式锁""" identifier = str(uuid.uuid4()) acquired = 0 start_time = time() * 1000 for server in self.servers: try: if server.set(f'lock:{lock_name}', identifier, nx=True, px=ttl): acquired += 1 except Exception: pass elapsed = (time() * 1000) - start_time if acquired >= self.quorum and elapsed < ttl: return { 'valid': True, 'identifier': identifier, 'ttl': ttl - elapsed } # 获取失败,释放已获取的锁 self.release(lock_name, identifier) return {'valid': False} def release(self, lock_name, identifier): """释放Redlock分布式锁""" for server in self.servers: try: script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) end """ server.eval(script, 1, f'lock:{lock_name}', identifier) except Exception: pass

四、高级特性

4.1 管道操作

# 使用管道提升性能 pipe = r.pipeline() # 添加多个操作 pipe.set('key1', 'value1') pipe.set('key2', 'value2') pipe.get('key1') pipe.get('key2') # 一次性执行 results = pipe.execute() print(results)

4.2 事务

# 使用事务 with r.pipeline() as pipe: while True: try: pipe.watch('balance') current = int(pipe.get('balance')) if current >= 100: pipe.multi() pipe.decrby('balance', 100) pipe.incr('withdrawn') pipe.execute() break else: break except redis.WatchError: continue

4.3 发布/订阅

# 发布者 pubsub = r.pubsub() r.publish('news', 'Hello World') # 订阅者 pubsub.subscribe('news') for message in pubsub.listen(): if message['type'] == 'message': print(f"Received: {message['data']}")

五、性能优化

5.1 连接池

from redis import ConnectionPool # 创建连接池 pool = ConnectionPool( host='localhost', port=6379, db=0, max_connections=100 ) # 使用连接池 r = redis.Redis(connection_pool=pool)

5.2 数据分片

# 简单的分片策略 def get_shard(key): """根据key计算分片""" shards = [0, 1, 2] return shards[hash(key) % len(shards)] def get_from_shard(key): shard = get_shard(key) r = redis.Redis(db=shard) return r.get(key)

5.3 批量操作

# 批量获取 keys = ['user:1', 'user:2', 'user:3'] values = r.mget(keys) # 批量设置 mapping = { 'user:1': 'data1', 'user:2': 'data2' } r.mset(mapping)

六、监控与运维

6.1 监控指标

# 获取Redis信息 info = r.info() print(f"内存使用: {info['used_memory_human']}") print(f"连接数: {info['connected_clients']}") print(f"命中率: {info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses']):.2%}")

6.2 持久化配置

# RDB持久化 # save 900 1 # 900秒内至少1个key变化 # save 300 10 # 300秒内至少10个key变化 # AOF持久化 # appendonly yes # appendfsync everysec

6.3 集群配置

# Redis Cluster配置 from redis.cluster import RedisCluster rc = RedisCluster( host='localhost', port=7000, decode_responses=True ) rc.set('foo', 'bar') print(rc.get('foo'))

七、实战案例:缓存系统

7.1 实现缓存装饰器

def cache(key_prefix, ttl=3600): def decorator(func): def wrapper(*args, **kwargs): # 生成缓存key key = f"{key_prefix}:{args[0]}" # 尝试从缓存获取 cached = r.get(key) if cached: return json.loads(cached) # 执行函数 result = func(*args, **kwargs) # 缓存结果 r.set(key, json.dumps(result), ex=ttl) return result return wrapper return decorator @cache('user') def get_user(user_id): return db.query(f'SELECT * FROM users WHERE id = {user_id}')

7.2 限流实现

def rate_limit(user_id, limit=100, window=3600): """限流实现""" key = f'rate_limit:{user_id}' # 增加计数 count = r.incr(key) if count == 1: # 设置过期时间 r.expire(key, window) return count <= limit

八、常见问题与解决方案

8.1 缓存穿透

# 使用布隆过滤器解决缓存穿透 from bloom_filter import BloomFilter bloom = BloomFilter(max_elements=1000000, error_rate=0.01) def get_user(user_id): # 先检查布隆过滤器 if user_id not in bloom: return None # 从缓存获取 user = r.get(f'user:{user_id}') if user: return json.loads(user) # 从数据库获取 user = db.query(f'SELECT * FROM users WHERE id = {user_id}') if user: bloom.add(user_id) r.set(f'user:{user_id}', json.dumps(user)) return user

8.2 缓存击穿

# 使用互斥锁解决缓存击穿 def get_hot_data(key): # 先尝试获取缓存 data = r.get(key) if data: return json.loads(data) # 获取分布式锁 lock = acquire_lock(f'lock:{key}') if not lock: # 获取锁失败,等待重试 time.sleep(0.1) return get_hot_data(key) try: # 再次检查缓存 data = r.get(key) if data: return json.loads(data) # 从数据库加载 data = db.query(f'SELECT * FROM hot_data WHERE key = "{key}"') # 更新缓存 r.set(key, json.dumps(data), ex=3600) return data finally: release_lock(f'lock:{key}', lock)

8.3 缓存雪崩

# 设置随机过期时间避免缓存雪崩 import random def set_cache_with_random_ttl(key, value, base_ttl=3600): # 添加随机偏移 ttl = base_ttl + random.randint(0, 300) r.set(key, value, ex=ttl)

九、结语

Redis是一个功能强大的缓存和数据存储系统,掌握其核心特性和最佳实践对于构建高性能系统至关重要。本文介绍了Redis的基础操作、缓存策略、分布式锁和性能优化等内容,希望能帮助你更好地使用Redis。

#Redis #缓存 #分布式锁 #性能优化

http://www.cnnetsun.cn/news/2462875.html

相关文章:

  • MATLAB通信仿真避坑指南:手把手教你实现SSB调制解调(附完整代码和结果图)
  • 麦肯锡AI揭秘:AI的真正价值不在算法,而在重构组织与结构竞争力
  • 从零开始构建RISC-V处理器(三):全指令集数据通路设计与实现
  • 为什么你的Perplexity搜不出科学健身计划?NIST认证信息检索模型原理首度公开
  • 300+篇创新高,ACM会议,录用率27.1%!CCF推荐学术会议(C)截稿提醒
  • 不会C++也能搞算法?手把手教你用MATLAB Coder把.m文件变成VS2019能用的C++库
  • TEC-2实验台手把手:用6116芯片扩展存储器,从原理图到单步调试全流程
  • CNAS实验室一份完整的质量手册需要包含哪些要素?一文教会质量手册编写
  • RAG 不仅仅是向量库对接:深入解析其三大复杂挑战与工程实践
  • Windows 11终极优化指南:使用Win11Debloat一键清理系统冗余提升性能
  • ARM PMU性能监控与TLB缓存事件深度解析
  • SOLIDWORKS PDM 离线状态设置指南
  • 不平衡学习的自适应合成采样方法ADASYN(Matlab代码实现)
  • 量子同态加密:理论与实践的突破
  • ARM9老开发板救星:用BusyBox 1.7.0和4.3.2工具链构建根文件系统(避坑实录)
  • 实战演练:利用京东API一键抓取商品详情
  • 告别Telnet和Jmeter!用Apifox 2.3.24一站式搞定Dubbo 3.x接口调试(附Nacos注册中心实战)
  • Gemini Ultra长文本推理性能崩塌点在哪?实测128K tokens下响应时间激增217%的根因分析
  • 别再乱用BatchNorm了!PyTorch实战:LayerNorm、InstanceNorm、GroupNorm到底怎么选?
  • 终极Win11Debloat指南:3步彻底优化Windows 11系统性能与隐私
  • 2026 GEO 服务商深度盘点:AI 搜索时代品牌增长工具怎么选
  • 美团CVPR 2026中稿精选:视觉生成遇上慢思考,解码多模态推理新范式
  • 告别rqt_plot!用PlotJuggler+ROS2高效分析你的机器人传感器数据流
  • 无王无帝定乾坤,来自田间第一人 凰标立定新格局
  • 别再只勾选CMSIS-V2了!深入理解STM32CubeMX中FreeRTOS的CMSIS层:如何让你的代码更易移植与维护
  • 保姆级教程:在Ubuntu 20.04上搞定Intel RealSense D435i与ROS Noetic的联调(含RK3588避坑指南)
  • 构建网易云音乐API服务:Node.js技术架构与全栈集成方案
  • GD32 SPI通信协议详解与W25Q64 Flash驱动实战
  • 3分钟快速上手LyricsX:打造专属桌面歌词体验的完整指南
  • RTOS任务通知:轻量级通信机制的原理、应用与性能优化