Redis实战案例:从缓存雪崩到秒杀系统
Redis作为一款高性能的内存数据存储,在现代应用架构中扮演着至关重要的角色。它远不止是一个简单的缓存工具,更是解决高并发、分布式系统难题的利器。本文将通过三个核心实战案例,带你深入理解Redis在实际生产环境中的应用。
案例一:应对缓存雪崩——构建稳健的缓存层
缓存雪崩是指在某一时刻,大量缓存数据同时过期失效,导致所有请求直接穿透到数据库,造成数据库瞬时压力过大甚至崩溃。
问题分析
假设我们有一个商品查询服务,缓存了10万条商品信息,过期时间都设置为2小时。当2小时后缓存集体失效,瞬间涌来的用户请求将直接访问数据库,极易引发服务不可用。
解决方案与代码实现
1. 差异化过期时间
最直接的解决方案是为不同的缓存键设置随机的过期时间,避免同时失效。
import redis
import random
import json
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
def get_product_info(product_id):
"""获取商品信息,使用差异化过期时间防止雪崩"""
cache_key = f"product:{product_id}"
# 1. 尝试从缓存读取
product_data = r.get(cache_key)
if product_data:
return json.loads(product_data)
# 2. 缓存不存在,查询数据库
# 模拟数据库查询
product_info = query_database_for_product(product_id)
if product_info:
# 设置缓存,基础过期时间120分钟 + 随机0-10分钟
base_ttl = 120 * 60 # 120分钟
random_ttl = random.randint(0, 600) # 0-10分钟随机值
total_ttl = base_ttl + random_ttl
r.setex(cache_key, total_ttl, json.dumps(product_info))
return product_info
def query_database_for_product(product_id):
"""模拟数据库查询"""
# 这里是实际的数据库查询逻辑
return {"id": product_id, "name": "示例商品", "price": 99.9}
2. 永不过期+后台更新策略
对于极其重要的热点数据,可以采用”永不过期”策略,但通过后台任务定时更新。
import threading
import time
def get_product_info_with_background_refresh(product_id):
"""使用后台刷新策略"""
cache_key = f"product:{product_id}"
lock_key = f"lock:product:{product_id}"
# 尝试获取数据
product_data = r.get(cache_key)
if product_data:
data = json.loads(product_data)
# 检查是否需要后台刷新(比如距离过期时间只剩5分钟)
if needs_refresh(data):
# 异步刷新,不阻塞当前请求
threading.Thread(target=refresh_cache, args=(product_id,)).start()
return data['info']
# 缓存未命中,需要加锁防止缓存击穿
if acquire_lock(lock_key):
try:
# 再次检查,可能已经被其他线程加载
product_data = r.get(cache_key)
if product_data:
return json.loads(product_data)['info']
# 查询数据库
product_info = query_database_for_product(product_id)
if product_info:
# 设置永不过期,但记录更新时间
cache_data = {
'info': product_info,
'last_updated': time.time(),
'version': 1
}
r.set(cache_key, json.dumps(cache_data))
return product_info
finally:
release_lock(lock_key)
else:
# 未获取到锁,短暂等待后重试或返回默认值
time.sleep(0.1)
return get_product_info_with_background_refresh(product_id)
def acquire_lock(lock_key, timeout=10):
"""获取分布式锁"""
return r.set(lock_key, 'locked', nx=True, ex=timeout)
def release_lock(lock_key):
"""释放锁"""
r.delete(lock_key)
案例二:分布式锁的实现——确保操作的原子性
在分布式系统中,如何保证多个服务实例对共享资源的互斥访问是一个经典问题。Redis的原子操作特性使其成为实现分布式锁的理想选择。
基于SETNX的分布式锁实现
class RedisDistributedLock:
def __init__(self, redis_client, lock_key, expire_time=30):
self.redis = redis_client
self.lock_key = lock_key
self.expire_time = expire_time
self.identifier = str(uuid.uuid4()) # 唯一标识,防止误删其他客户端的锁
def acquire(self, timeout=10):
"""获取锁,支持超时"""
end_time = time.time() + timeout
while time.time() < end_time:
# 使用SET命令替代SETNX+EXPIRE,保证原子性
if self.redis.set(self.lock_key, self.identifier,
nx=True, ex=self.expire_time):
return True
time.sleep(0.01) # 短暂等待后重试
return False
def release(self):
"""释放锁,使用Lua脚本保证原子性"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
script = self.redis.register_script(lua_script)
result = script(keys=[self.lock_key], args=[self.identifier])
return result == 1
def __enter__(self):
if self.acquire():
return self
raise Exception("获取分布式锁失败")
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用示例
def deduct_inventory(product_id, quantity):
"""扣减库存,使用分布式锁保证线程安全"""
lock_key = f"inventory_lock:{product_id}"
with RedisDistributedLock(r, lock_key):
# 查询当前库存
current_stock = int(r.get(f"inventory:{product_id}") or 0)
if current_stock >= quantity:
# 扣减库存
r.decrby(f"inventory:{product_id}", quantity)
return True
else:
return False
Redlock算法:多节点Redis的分布式锁
在生产环境中,为了更高的可靠性,我们可能需要使用多个Redis实例。Redlock算法提供了这样的解决方案。
class Redlock:
def __init__(self, connection_list):
self.quorum = len(connection_list) // 2 + 1
self.redis_instances = connection_list
def lock(self, resource, ttl):
"""获取锁"""
identifier = str(uuid.uuid4())
locks_acquired = 0
for redis_instance in self.redis_instances:
try:
if redis_instance.set(resource, identifier, nx=True, ex=ttl):
locks_acquired += 1
except redis.RedisError:
continue
# 只有在大多数节点上获取到锁才算成功
if locks_acquired >= self.quorum:
return identifier
else:
# 清理已获取的锁
self.unlock(resource, identifier)
return None
def unlock(self, resource, identifier):
"""释放锁"""
for redis_instance in self.redis_instances:
try:
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
script = redis_instance.register_script(lua_script)
script(keys=[resource], args=[identifier])
except redis.RedisError:
continue
案例三:秒杀系统设计——应对极端高并发场景
秒杀系统是电商场景中的典型高并发案例,需要在极短时间内处理海量请求,同时保证数据的一致性和系统的稳定性。
秒杀系统架构设计
核心思路:
- 流量削峰:将瞬时高峰流量转化为平稳流量
- 异步处理:将核心业务逻辑异步化
- 库存预热:提前将库存加载到Redis
- 读多写少:优化读性能,控制写并发
代码实现
1. 库存预热与校验
def preheat_seckill_inventory(product_id, total_inventory):
"""秒杀库存预热"""
inventory_key = f"seckill:inventory:{product_id}"
user_set_key = f"seckill:users:{product_id}"
# 设置总库存
r.set(inventory_key, total_inventory)
# 初始化已购买用户集合(用于去重)
# 注意:实际生产中这个集合可能会很大,需要考虑分片或其他方案
def can_participate_seckill(user_id, product_id):
"""检查用户是否可以参与秒杀"""
inventory_key = f"seckill:inventory:{product_id}"
user_set_key = f"seckill:users:{product_id}"
# 检查库存
remaining = int(r.get(inventory_key) or 0)
if remaining <= 0:
return False, "库存已售罄"
# 检查用户是否已经参与过(防重复购买)
if r.sismember(user_set_key, user_id):
return False, "您已经参与过本次秒杀"
return True, "可以参与"
2. 基于Lua脚本的原子性秒杀操作
为了保证原子性,我们将多个操作封装在一个Lua脚本中执行:
-- seckill.lua
local inventory_key = KEYS[1]
local user_set_key = KEYS[2]
local user_id = ARGV[1]
local timestamp = ARGV[2]
-- 检查库存
local remaining = tonumber(redis.call('GET', inventory_key))
if remaining <= 0 then
return 0 -- 库存不足
end
-- 检查用户是否重复购买
if redis.call('SISMEMBER', user_set_key, user_id) == 1 then
return 1 -- 重复购买
end
-- 扣减库存
redis.call('DECR', inventory_key)
-- 记录购买用户
redis.call('SADD', user_set_key, user_id)
-- 记录订单信息
local order_id = user_id .. ':' .. timestamp
redis.call('HSET', 'seckill:orders', order_id, timestamp)
return 2 -- 秒杀成功
Python中调用Lua脚本:
class SeckillService:
def __init__(self, redis_client):
self.redis = redis_client
# 加载Lua脚本
with open('seckill.lua', 'r') as f:
self.seckill_script = self.redis.register_script(f.read())
def execute_seckill(self, user_id, product_id):
"""执行秒杀操作"""
inventory_key = f"seckill:inventory:{product_id}"
user_set_key = f"seckill:users:{product_id}"
timestamp = str(int(time.time() * 1000)) # 毫秒时间戳
try:
# 执行原子性的秒杀操作
result = self.seckill_script(
keys=[inventory_key, user_set_key],
args=[user_id, timestamp]
)
if result == 0:
return False, "库存不足"
elif result == 1:
return False, "请勿重复购买"
elif result == 2:
# 秒杀成功,异步处理后续逻辑
self._process_success_order(user_id, product_id, timestamp)
return True, "秒杀成功"
else:
return False, "系统异常"
except Exception as e:
return False, f"秒杀失败: {str(e)}"
def _process_success_order(self, user_id, product_id, timestamp):
"""异步处理成功订单"""
# 这里可以将订单信息推送到消息队列,由后端服务异步处理
order_info = {
'user_id': user_id,
'product_id': product_id,
'timestamp': timestamp,
'status': 'pending'
}
# 推送到消息队列或直接写入数据库
# self.message_queue.push('order_queue', order_info)
3. 限流与降级策略
import time
class RateLimiter:
def __init__(self, redis_client, key_prefix="rate_limit"):
self.redis = redis_client
self.key_prefix = key_prefix
def is_allowed(self, identifier, max_requests, window_size):
"""滑动窗口限流"""
key = f"{self.key_prefix}:{identifier}"
now = time.time() * 1000 # 毫秒时间戳
window_start = now - window_size * 1000
# 使用管道保证原子性
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start) # 移除窗口外的请求
pipe.zcard(key) # 获取当前窗口内请求数
pipe.zadd(key, {str(now): now}) # 添加当前请求
pipe.expire(key, window_size + 1) # 设置过期时间
results = pipe.execute()
current_requests = results[1]
return current_requests <= max_requests
# 在秒杀入口处添加限流
def seckill_entrypoint(user_id, product_id):
"""秒杀入口,包含限流逻辑"""
limiter = RateLimiter(r)
# 用户级限流:每秒最多5次请求
if not limiter.is_allowed(f"user:{user_id}", 5, 1):
return False, "请求过于频繁,请稍后再试"
# IP级限流:每秒最多100次请求
client_ip = get_client_ip() # 获取客户端IP
if not limiter.is_allowed(f"ip:{client_ip}", 100, 1):
return False, "系统繁忙,请稍后再试"
# 执行秒杀逻辑
return SeckillService(r).execute_seckill(user_id, product_id)
总结
通过这三个实战案例,我们可以看到Redis在解决实际问题时的强大能力:
- 缓存雪崩:通过差异化过期时间和后台刷新策略,构建了稳健的缓存层
- 分布式锁:利用Redis的原子操作,实现了安全可靠的分布式互斥机制
- 秒杀系统:结合Lua脚本、限流降级等策略,设计了可应对极端高并发的系统架构
Redis的优秀性能和丰富的数据结构为我们提供了强大的工具,但同时也需要根据具体业务场景合理设计和优化。在实际应用中,还需要结合监控、日志分析等手段,持续优化系统性能。
希望这些案例能为你在实际项目中使用Redis提供有益的参考和启发。
文档信息
- 本文作者:JiliangLee
- 本文链接:https://leejiliang.cn/2025/09/27/Redis-%E5%AE%9E%E6%88%98%E6%A1%88%E4%BE%8B/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)