Вторая задача — замена cron-задач на полноценные очереди. Уведомления о заказах (push, SMS, email), расчёт ETA курьеров, начисление бонусов — всё это выполнялось через cron каждые 30 секунд. Это приводило к задержкам до 30 секунд (в худшем случае) и нагружало PostgreSQL бессмысленными поллингами.
Redis Streams — это персистентная структура данных, похожая на Apache Kafka, но встроенная в Redis. Мы создали отдельные потоки для разных задач:
# Создаём consumer groups для каждого потока
# Группа гарантирует, что каждое сообщение обработается ровно одним воркером
# Поток уведомлений о заказах
XGROUP CREATE orders:notifications $ MKSTREAM
# Поток расчёта ETA
XGROUP CREATE courier:eta-calc $ MKSTREAM
# Поток начисления бонусов
XGROUP CREATE loyalty:bonuses $ MKSTREAM
# Добавляем событие при создании заказа (в Go-коде API):
XADD orders:notifications * \
order_id 78542 \
user_id 98765 \
restaurant_id 142 \
type new_order \
user_phone "+79171234567" \
restaurant_name "Пекарня Волга"
# Проверяем содержимое потока
XLEN orders:notifications
# (integer) 1
XRANGE orders:notifications - +
# 1) 1) "1709312847123-0"
# 2) 1) "order_id" 2) "78542"
# 3) "user_id" 4) "98765"
# 5) "type" 6) "new_order"
# ...
Каждый воркер — отдельный процесс, который вычитывает события из потока через consumer group. При падении воркера необработанные сообщения автоматически переназначаются:
// notification_worker.go
func (w *NotificationWorker) Run(ctx context.Context) error {
consumerName := fmt.Sprintf("worker-%s", hostname())
for {
// Читаем до 10 сообщений с блокировкой 5 секунд
streams, err := w.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "notification-workers",
Consumer: consumerName,
Streams: []string{"orders:notifications", ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err == redis.Nil {
continue // Нет новых сообщений
}
for _, msg := range streams[0].Messages {
orderID := msg.Values["order_id"].(string)
msgType := msg.Values["type"].(string)
switch msgType {
case "new_order":
w.sendPush(msg.Values)
w.sendSMS(msg.Values)
case "status_change":
w.sendPush(msg.Values)
case "delivered":
w.sendPush(msg.Values)
w.requestRating(msg.Values)
}
// Подтверждаем обработку
w.rdb.XAck(ctx, "orders:notifications",
"notification-workers", msg.ID)
}
}
}
// Обработка зависших сообщений (claimed but not acked)
// Запускается отдельной горутиной каждые 60 секунд
func (w *NotificationWorker) ReclaimStale(ctx context.Context) {
pending, _ := w.rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: "orders:notifications",
Group: "notification-workers",
Start: "-",
End: "+",
Count: 100,
}).Result()
for _, p := range pending {
if p.Idle > 2*time.Minute {
// Перехватываем зависшее сообщение
w.rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: "orders:notifications",
Group: "notification-workers",
Consumer: hostname(),
MinIdle: 2 * time.Minute,
Messages: []string{p.ID},
})
}
}
}