Bạn có biết?
Khi bạn đặt hàng trên Shopee, hệ thống cần xử lý hàng loạt bước: kiểm tra tồn kho, trừ tiền, tạo vận đơn, gửi thông báo… Nếu một bước thất bại, hệ thống phải retry mà không mất dữ liệu. Đây chính là lý do Redis Streams ra đời — một cơ chế event log mạnh mẽ, khắc phục hoàn toàn nhược điểm của Pub/Sub.
Redis Streams là gì?
Streams là cấu trúc dữ liệu mới nhất trong Redis (từ phiên bản 5.0), hoạt động như một append-only log. Mỗi entry trong stream có một ID duy nhất (dựa trên timestamp) và chứa một hoặc nhiều field-value pairs.
Khác với Pub/Sub (fire-and-forget), Streams:
- ✅ Persist messages — Dữ liệu được lưu trữ, không mất khi restart
- ✅ Consumer Groups — Nhiều consumer xử lý message song song
- ✅ Message acknowledgment — Xác nhận đã xử lý thành công
- ✅ Pending messages — Theo dõi message chưa xử lý
- ✅ History queries — Đọc lại message cũ bất cứ lúc nào
Các lệnh cơ bản
Thêm dữ liệu (XADD)
# Thêm entry vào stream
XADD orders * user_id 1001 product "iPhone 15" amount 999
# "1711785600000-0" → Auto-generated ID
# Thêm với ID cụ thể
XADD orders 1711785600000-0 user_id 1002 product "MacBook" amount 1999
# Giới hạn độ dài stream (MAXLEN)
XADD orders MAXLEN ~ 1000 * user_id 1003 product "iPad" amount 799
# ~ 1000: xóa bớt entries cũ, giữ khoảng 1000 entries
Đọc dữ liệu (XREAD)
# Đọc tất cả entries từ đầu
XREAD COUNT 10 STREAMS orders 0-0
# Đọc entries mới (từ ID cụ thể)
XREAD COUNT 10 STREAMS orders 1711785600000-0
# Đọc entries mới nhất (blocking - chờ có data)
XREAD BLOCK 0 COUNT 10 STREAMS orders $
# BLOCK 0: chờ vô hạn
# $: chỉ đọc entries mới sau thời điểm này
Thông tin stream
# Xem thông tin stream
XINFO STREAM orders
# length: 5
# first-entry: 1711785600000-0
# last-entry: 1711785604000-0
# Xem số lượng entries
XLEN orders
# (integer) 5
# Xem entries trong khoảng ID
XRANGE orders 1711785600000-0 1711785602000-0
# Xem entries mới nhất (ngược)
XREVRANGE orders + - COUNT 3
Xóa và cắt stream
# Xóa entry theo ID
XDEL orders 1711785600000-0
# Cắt stream (giữ N entries mới nhất)
XTRIM orders MAXLEN 1000
# Cắt stream (xóa entries cũ hơn N)
XTRIM orders MINID 1711785600000-0

Consumer Groups
Consumer Groups là tính năng mạnh mẽ nhất của Streams. Nó cho phép nhiều consumer xử lý message song song, mỗi message chỉ được xử lý bởi một consumer.
Tạo và sử dụng Consumer Group
# Tạo consumer group
XGROUP CREATE orders order-group 0 MKSTREAM
# MKSTREAM: tạo stream nếu chưa tồn tại
# 0: bắt đầu đọc từ đầu
# Consumer đọc message
XREADGROUP GROUP order-group consumer-1 COUNT 5 STREAMS orders >
# >: đọc message mới (chưa được giao cho consumer nào)
# Kết quả:
# 1) 1) "orders"
# 2) 1) 1) "1711785600000-0"
# 2) 1) "user_id" 2) "1001" 3) "product" 4) "iPhone 15"
# Xác nhận đã xử lý thành công
XACK orders order-group 1711785600000-0
# (integer) 1
Nhiều consumers song song
# Consumer 1 đọc message
XREADGROUP GROUP order-group consumer-1 COUNT 3 STREAMS orders >
# Nhận: msg1, msg2, msg3
# Consumer 2 đọc message (cùng group)
XREADGROUP GROUP order-group consumer-2 COUNT 3 STREAMS orders >
# Nhận: msg4, msg5, msg6
# Consumer 3 đọc message (cùng group)
XREADGROUP GROUP order-group consumer-3 COUNT 3 STREAMS orders >
# Nhận: msg7, msg8, msg9
# Mỗi message chỉ được giao cho MỘT consumer!
Quản lý Pending Messages
# Xem message chưa ACK
XPENDING orders order-group
# pending: 3
# min: 1711785600000-0
# max: 1711785602000-0
# consumers:
# 1) consumer-1: 2 messages
# 2) consumer-2: 1 message
# Xem chi tiết pending messages
XPENDING orders order-group - + 10
# 1) 1) "1711785600000-0"
# 2) "consumer-1"
# 3) (integer) 45000 → đã chờ 45 giây
# 4) (integer) 1 → số lần delivered
# Claim message (chuyển từ consumer khác sang)
XCLAIM orders order-group consumer-2 60000 1711785600000-0
# 60000: chỉ claim message đã chờ > 60 giây
Use Cases thực tế
1. Order Processing Pipeline
# Tạo order stream
XADD orders * user_id 1001 items "iPhone,Case" total 1049
# Worker 1: Kiểm tra tồn kho
XREADGROUP GROUP order-workers inventory-worker COUNT 10 STREAMS orders >
# Xử lý → XACK
# Worker 2: Xử lý thanh toán
XREADGROUP GROUP order-workers payment-worker COUNT 10 STREAMS orders >
# Xử lý → XACK
# Worker 3: Tạo vận đơn
XREADGROUP GROUP order-workers shipping-worker COUNT 10 STREAMS orders >
# Xử lý → XACK
2. Activity Log / Audit Trail
# Ghi log hoạt động
XADD audit:log * user_id 1001 action "login" ip "192.168.1.1"
XADD audit:log * user_id 1001 action "update_profile" field "email"
XADD audit:log * user_id 1002 action "purchase" amount 999
# Đọc log của user trong 24h qua
XRANGE audit:log 1711700000000-0 1711786400000-0
# Đọc 100 log mới nhất
XREVRANGE audit:log + - COUNT 100
3. Real-time Analytics
# Ghi events
XADD events:pageview * page "/products" user_id 1001
XADD events:pageview * page "/cart" user_id 1001
XADD events:pageview * page "/checkout" user_id 1001
# Consumer xử lý analytics
XREADGROUP GROUP analytics worker-1 BLOCK 0 COUNT 100 STREAMS events:pageview >
# Tính toán → cập nhật dashboard real-time
4. Task Queue với Retry
# Thêm task
XADD tasks * type "send_email" to "[email protected]" retries 0
# Worker xử lý
while True:
messages = XREADGROUP GROUP task-workers worker-1 BLOCK 5000 COUNT 1 STREAMS tasks >
if messages:
msg_id, data = messages[0]
try:
process_task(data)
XACK tasks task-workers msg_id
except Exception as e:
retries = int(data.get("retries", 0))
if retries < 3:
XADD tasks * type data["type"] to data["to"] retries (retries + 1)
XACK tasks task-workers msg_id
Streams vs Pub/Sub vs Lists
| Tiêu chí | Pub/Sub | Lists | Streams |
|---|---|---|---|
| Persistence | Không | Tạm thời | Lâu dài |
| Consumer Groups | Không | Không | Có |
| Message ACK | Không | Không | Có |
| History | Không | Giới hạn | Đầy đủ |
| Retry | Không | Thủ công | Tự động |
| Độ phức tạp | Đơn giản | Đơn giản | Phức tạp hơn |
| Use case | Notifications | Simple queue | Event sourcing |
Best Practices
- Giới hạn stream size — Dùng MAXLEN hoặc MINID khi XADD
- Luôn XACK — Sau khi xử lý thành công, xác nhận ngay
- Monitor pending — Kiểm tra XPENDING định kỳ
- Retry logic — XCLAIM message từ consumer bị crash
- Consumer naming — Đặt tên rõ ràng (worker-1, worker-2)
- Block reads — Dùng BLOCK thay vì polling
Bước tiếp theo
Bạn đã nắm vững Redis Streams! Tiếp theo, chúng ta sẽ tìm hiểu về Lua Scripts — cách viết atomic operations phức tạp trực tiếp trong Redis.