Redis消息队列与发布订阅:从原理到实战应用
在现代分布式系统中,消息队列是解耦系统组件、实现异步通信的重要基础设施。Redis不仅是一个高性能的内存数据库,还提供了强大的消息队列功能。本文将深入探讨Redis的两种消息传递模式:基于List的队列和Pub/Sub发布订阅模型。
1. 基于List的消息队列
Redis的List数据结构为实现简单而高效的消息队列提供了基础支持。通过LPUSH
/RPOP
或RPUSH
/LPOP
命令组合,可以轻松实现先进先出(FIFO)队列。
1.1 基本队列操作
import redis
import json
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 生产者:推送消息到队列
def produce_message(queue_name, message):
message_data = {
'id': 'msg_001',
'content': message,
'timestamp': time.time()
}
r.lpush(queue_name, json.dumps(message_data))
print(f"已发送消息: {message}")
# 消费者:从队列获取消息
def consume_message(queue_name):
while True:
# 阻塞式获取消息,超时时间为30秒
message = r.brpop(queue_name, timeout=30)
if message:
message_data = json.loads(message[1])
print(f"收到消息: {message_data['content']}")
# 处理消息业务逻辑
process_message(message_data)
else:
print("等待消息中...")
def process_message(message):
# 模拟消息处理
time.sleep(0.5)
print(f"处理完成: {message['id']}")
# 使用示例
produce_message('task_queue', '用户注册任务')
1.2 可靠性增强:ACK机制
基本的List队列缺少消息确认机制,可以通过以下方式增强可靠性:
def reliable_consume_message(queue_name, processing_queue):
while True:
# 从主队列获取消息
message = r.brpoplpush(queue_name, processing_queue, timeout=30)
if message:
try:
message_data = json.loads(message)
process_message(message_data)
# 处理成功后从处理队列移除
r.lrem(processing_queue, 1, message)
except Exception as e:
print(f"处理失败: {e}")
# 将失败消息移到死信队列
r.lpush('dead_letter_queue', message)
2. Pub/Sub发布订阅模式
Redis的Pub/Sub模式提供了一对多的消息广播能力,适用于实时消息推送、事件通知等场景。
2.1 基本发布订阅
# 发布者
def publish_message(channel, message):
r.publish(channel, json.dumps(message))
print(f"已发布到频道 {channel}: {message}")
# 订阅者
class MessageSubscriber:
def __init__(self):
self.pubsub = r.pubsub()
def subscribe(self, channels):
self.pubsub.subscribe(channels)
print(f"已订阅频道: {channels}")
def listen(self):
for message in self.pubsub.listen():
if message['type'] == 'message':
message_data = json.loads(message['data'])
print(f"收到消息: {message_data}")
self.handle_message(message_data)
def handle_message(self, message):
# 处理消息逻辑
pass
# 使用示例
subscriber = MessageSubscriber()
subscriber.subscribe(['notifications', 'alerts'])
# 在另一个线程中启动监听
import threading
thread = threading.Thread(target=subscriber.listen)
thread.start()
# 发布消息
publish_message('notifications', {'type': 'info', 'content': '系统维护通知'})
2.2 模式订阅
Redis还支持模式订阅,可以同时订阅多个匹配的频道:
# 订阅所有以"user_"开头的频道
subscriber.pubsub.psubscribe('user_*')
# 发布到匹配的频道
r.publish('user_123', '用户消息')
r.publish('user_456', '另一个用户消息')
3. 两种模式的对比与选择
3.1 特性对比
特性 | List队列 | Pub/Sub |
---|---|---|
消息持久化 | 支持 | 不支持 |
消息确认 | 可实现 | 不支持 |
消费者模型 | 竞争消费 | 广播消费 |
实时性 | 轮询方式 | 实时推送 |
内存占用 | 消息累积占用内存 | 瞬时消息不累积 |
3.2 适用场景
List队列适合:
- 任务队列系统(如Celery使用Redis作为broker)
- 需要保证消息不丢失的场景
- 需要消息确认和重试机制的场景
- 流量削峰和异步处理
Pub/Sub适合:
- 实时消息推送(如聊天室、实时通知)
- 事件广播系统
- 需要一对多消息分发的场景
- 临时性的消息传递
4. 实战应用案例
4.1 电商订单处理系统
# 订单处理队列
class OrderProcessor:
def __init__(self):
self.redis = redis.Redis()
self.order_queue = 'orders'
self.processing_queue = 'orders_processing'
def place_order(self, order_data):
# 下单后推送到队列
order_id = self.generate_order_id()
order_data['id'] = order_id
order_data['status'] = 'pending'
self.redis.lpush(self.order_queue, json.dumps(order_data))
# 同时发布订单创建事件
self.redis.publish('order_created', json.dumps(order_data))
return order_id
def process_orders(self):
while True:
order_json = self.redis.brpoplpush(
self.order_queue,
self.processing_queue,
timeout=30
)
if order_json:
try:
order = json.loads(order_json)
self.handle_order(order)
self.redis.lrem(self.processing_queue, 1, order_json)
# 发布订单处理完成事件
self.redis.publish('order_processed', order_json)
except Exception as e:
print(f"订单处理失败: {e}")
self.redis.lpush('failed_orders', order_json)
# 实时通知服务
class NotificationService:
def __init__(self):
self.pubsub = redis.Redis().pubsub()
self.pubsub.subscribe(['order_created', 'order_processed'])
def start(self):
for message in self.pubsub.listen():
if message['type'] == 'message':
event_data = json.loads(message['data'])
self.send_notification(message['channel'], event_data)
def send_notification(self, channel, data):
if channel == 'order_created':
# 发送新订单通知
pass
elif channel == 'order_processed':
# 发送订单处理完成通知
pass
4.2 实时聊天系统
class ChatSystem:
def __init__(self):
self.redis = redis.Redis()
def send_message(self, room_id, user, message):
message_data = {
'room': room_id,
'user': user,
'message': message,
'timestamp': time.time()
}
# 存储消息历史
self.redis.lpush(f'chat:history:{room_id}', json.dumps(message_data))
# 限制历史消息数量
self.redis.ltrim(f'chat:history:{room_id}', 0, 99)
# 实时广播
self.redis.publish(f'chat:room:{room_id}', json.dumps(message_data))
def get_history(self, room_id, count=50):
history = self.redis.lrange(f'chat:history:{room_id}', 0, count-1)
return [json.loads(msg) for msg in history[::-1]]
5. 性能优化与最佳实践
5.1 性能优化建议
- 管道批处理:使用pipeline减少网络往返
pipe = r.pipeline() for message in messages: pipe.lpush('queue', message) pipe.execute()
适当配置内存策略:根据消息量调整maxmemory-policy
- 连接池管理:
文档信息
- 本文作者:JiliangLee
- 本文链接:https://leejiliang.cn/2025/09/16/Redis-%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B8%8E%E8%AE%A2%E9%98%85/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)