Внедрение асинхронной архитектуры с очередями сообщений для сервиса доставки на 50 000 заказов в день

Проблема: синхронный монолит под нагрузкой

Сервис доставки «ДоставкаПлюс» обрабатывал 50 000 заказов в день и рос на 30% в квартал. Архитектура была классической: REST API принимает заказ, синхронно валидирует адрес через геокодер, рассчитывает стоимость, резервирует курьера, отправляет SMS и push, записывает в БД — всё в одном HTTP-запросе длительностью 2-4 секунды.

Проблемы копились:

  • Каскадные сбои: если SMS-провайдер тормозил — тормозило всё. Таймаут геокодера в 5 секунд блокировал поток обработки.
  • Потеря заказов: при падении сервера заказы из RAM пропадали. За январь 2026 потеряли 347 заказов на 2.1 млн руб.
  • Невозможность масштабирования: добавление серверов не помогало — узким местом были внешние API.

Команда itfresh.ru предложила перейти на асинхронную событийную модель с очередями сообщений.

Синхронная vs асинхронная модель

В синхронной модели каждый запрос блокирует поток до завершения всех операций. Если цепочка из 6 шагов занимает 200 мс на шаг, общее время = 1.2 секунды. Если один шаг тормозит до 5 секунд — весь запрос тормозит.

В асинхронной модели запрос принимается за 10-50 мс: данные валидируются, заказ записывается в очередь, клиент получает 202 Accepted. Дальнейшая обработка идёт фоновыми обработчиками (consumers), каждый из которых масштабируется независимо.

Ключевое отличие — в модели обработки. Вместо потока-на-запрос (thread-per-request) используется event loop, который опрашивает файловые дескрипторы через epoll (Linux) или kqueue (FreeBSD). Один поток обслуживает тысячи соединений:

# Упрощённый event loop на Python (asyncio)
import asyncio
import aio_pika

async def process_order(message: aio_pika.IncomingMessage):
    async with message.process():
        order = json.loads(message.body)
        # Каждый шаг — отдельный consumer
        await validate_address(order)
        await publish_to_queue('pricing', order)

async def main():
    connection = await aio_pika.connect_robust(
        'amqp://guest:guest@rabbitmq:5672/'
    )
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=100)

    queue = await channel.declare_queue(
        'orders.new', durable=True
    )
    await queue.consume(process_order)

    # Event loop крутится вечно
    await asyncio.Future()

asyncio.run(main())

Выбор брокера: RabbitMQ vs Kafka vs NATS

Мы развернули три брокера на тестовом стенде и прогнали нагрузку, эмулирующую профиль «ДоставкаПлюс»: 600 заказов в минуту с пиками до 2000.

КритерийRabbitMQ 3.13Kafka 3.7NATS 2.10
Модель доставкиPush (брокер пушит consumers)Pull (consumers тянут данные)Push + Pull
Гарантия доставкиAt-least-once, at-most-onceAt-least-once, exactly-onceAt-least-once (JetStream)
Throughput (msg/sec)30 000500 000+200 000
Латентность P992 мс15 мс0.5 мс
Хранение сообщенийУдаляет после ACKХранит N дней (лог)По настройке
RAM (3 ноды)1.5 GB6 GB512 MB
Сложность эксплуатацииСредняяВысокая (ZooKeeper/KRaft)Низкая

Для «ДоставкаПлюс» мы выбрали RabbitMQ: push-модель с гибкой маршрутизацией через exchanges, зрелая экосистема, хороший management UI. Kafka избыточен для 50K заказов/день (проектировался под триллион событий LinkedIn), а NATS пока не имеет достаточной экосистемы плагинов.

Dead Letter Queues и стратегии повторов

В асинхронной системе сообщение может не обработаться: внешний API недоступен, данные некорректны, consumer упал. Без правильной стратегии повторов сообщения теряются или зацикливаются.

Мы реализовали трёхуровневую стратегию:

# RabbitMQ: настройка очередей с DLQ
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()

# Dead Letter Exchange
channel.exchange_declare(
    exchange='dlx.orders', exchange_type='direct'
)

# Основная очередь с DLQ
channel.queue_declare(
    queue='orders.process',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx.orders',
        'x-dead-letter-routing-key': 'orders.failed',
        'x-message-ttl': 300000,  # 5 минут максимум в очереди
        'x-max-length': 100000,
    }
)

# Очередь для повторных попыток с задержкой
for delay in [10, 60, 300]:  # 10с, 1м, 5м
    channel.queue_declare(
        queue=f'orders.retry.{delay}s',
        durable=True,
        arguments={
            'x-dead-letter-exchange': '',
            'x-dead-letter-routing-key': 'orders.process',
            'x-message-ttl': delay * 1000,
        }
    )

# Финальная DLQ для ручного разбора
channel.queue_declare(
    queue='orders.dead', durable=True
)

Логика повторов в consumer:

def on_message(channel, method, properties, body):
    retry_count = (properties.headers or {}).get('x-retry-count', 0)

    try:
        process_order(json.loads(body))
        channel.basic_ack(delivery_tag=method.delivery_tag)
    except TemporaryError as e:
        # Повторяемая ошибка — отправляем в retry queue
        if retry_count < 3:
            delays = [10, 60, 300]
            channel.basic_publish(
                exchange='',
                routing_key=f'orders.retry.{delays[retry_count]}s',
                body=body,
                properties=pika.BasicProperties(
                    headers={'x-retry-count': retry_count + 1},
                    delivery_mode=2,  # persistent
                )
            )
        else:
            # Исчерпали попытки — в DLQ
            channel.basic_publish(
                exchange='dlx.orders',
                routing_key='orders.failed',
                body=body
            )
        channel.basic_ack(delivery_tag=method.delivery_tag)
    except PermanentError:
        # Невосстановимая ошибка — сразу в DLQ
        channel.basic_nack(
            delivery_tag=method.delivery_tag, requeue=False
        )

Идемпотентность: защита от дублей

В системе at-least-once delivery сообщение может быть доставлено дважды: consumer обработал заказ, но не успел отправить ACK до перезапуска. Без идемпотентности клиенту придёт два SMS и спишется двойная стоимость.

Мы реализовали идемпотентность через уникальный ключ операции:

import hashlib
import redis

redis_client = redis.Redis(host='redis', port=6379, db=0)

def ensure_idempotent(message_id: str, ttl: int = 86400) -> bool:
    """Возвращает True, если сообщение ещё не обработано."""
    key = f"processed:{message_id}"
    # SET NX — атомарная проверка и установка
    result = redis_client.set(key, '1', nx=True, ex=ttl)
    return result is not None

def process_order(order: dict):
    message_id = order.get('idempotency_key') or \
        hashlib.sha256(
            f"{order['id']}:{order['timestamp']}".encode()
        ).hexdigest()

    if not ensure_idempotent(message_id):
        logger.info(f"Duplicate message {message_id}, skipping")
        return

    # Безопасно обрабатываем — точно первый раз
    create_delivery(order)
    charge_customer(order)
    send_notification(order)

Для критичных финансовых операций (списание средств) дополнительно использовали INSERT ... ON CONFLICT DO NOTHING в PostgreSQL с уникальным constraint на idempotency_key. Двойная защита: Redis для быстрой проверки, PostgreSQL как ultimate source of truth.

Мониторинг здоровья очередей

Очереди сообщений требуют специфического мониторинга: недостаточно следить только за CPU и RAM. Ключевые метрики:

  • Queue depth — количество сообщений в очереди. Растёт = consumers не справляются.
  • Consumer utilization — процент времени, когда consumer занят обработкой.
  • Message age — возраст самого старого сообщения в очереди.
  • DLQ size — количество сообщений в dead letter queue. Должно быть близко к нулю.

Мы настроили экспорт метрик RabbitMQ в Prometheus и дашборд в Grafana:

# docker-compose.yml — мониторинг RabbitMQ
services:
  rabbitmq:
    image: rabbitmq:3.13-management
    ports:
      - "5672:5672"
      - "15672:15672"
      - "15692:15692"  # Prometheus metrics
    environment:
      RABBITMQ_DEFAULT_USER: delivery
      RABBITMQ_DEFAULT_PASS: ${RABBIT_PASSWORD}
    volumes:
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./enabled_plugins:/etc/rabbitmq/enabled_plugins

  prometheus:
    image: prom/prometheus:v2.51.0
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana:10.4.0
    ports:
      - "3000:3000"
# prometheus.yml — scraping RabbitMQ
scrape_configs:
  - job_name: 'rabbitmq'
    scrape_interval: 15s
    static_configs:
      - targets: ['rabbitmq:15692']
    metrics_path: /metrics
# Alertmanager — алерт на рост очереди
groups:
  - name: rabbitmq
    rules:
      - alert: QueueDepthHigh
        expr: rabbitmq_queue_messages{queue="orders.process"} > 5000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Очередь orders.process переполнена"

      - alert: DLQNotEmpty
        expr: rabbitmq_queue_messages{queue="orders.dead"} > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Сообщения в DLQ — требуется ручной разбор"

Масштабирование consumers

Один из главных плюсов асинхронной архитектуры — независимое масштабирование каждого этапа обработки. Если геокодирование тормозит — добавляем consumers для очереди orders.geocode, не трогая остальные.

Мы настроили автоскейлинг через KEDA (Kubernetes Event-Driven Autoscaler):

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: order-processor
  namespace: delivery
spec:
  scaleTargetRef:
    name: order-processor
  minReplicaCount: 2
  maxReplicaCount: 20
  triggers:
    - type: rabbitmq
      metadata:
        host: amqp://delivery:${RABBIT_PASSWORD}@rabbitmq:5672/
        queueName: orders.process
        queueLength: "100"  # 1 pod на каждые 100 сообщений
        mode: QueueLength

В пиковые часы (обед, вечер) количество подов order-processor вырастало с 2 до 12, а ночью снижалось обратно до 2. Стоимость инфраструктуры снизилась на 40% по сравнению с always-on серверами.

Результаты и рекомендации

Через месяц после перехода на асинхронную архитектуру «ДоставкаПлюс» получила:

  • Время ответа API: с 2-4 секунд до 45 мс (заказ принимается мгновенно)
  • Потерянные заказы: 0 за весь месяц (было 347 за январь)
  • Устойчивость к сбоям: падение SMS-провайдера на 2 часа не повлияло на приём заказов — SMS отправились после восстановления
  • Масштабирование: успешно прошли нагрузочный тест на 200 000 заказов/день без деградации

Рекомендации для аналогичных проектов:

  • Начинайте с RabbitMQ — он проще Kafka и достаточен для 99% бизнес-сценариев
  • Реализуйте идемпотентность с первого дня, не откладывайте «на потом»
  • Dead Letter Queue — обязательный компонент, не опция
  • Мониторьте queue depth и message age — это ваши главные индикаторы здоровья

Планируете перейти на асинхронную архитектуру? Команда itfresh.ru поможет выбрать брокер, спроектировать топологию очередей и мигрировать без потери данных.

Часто задаваемые вопросы

Когда появляются каскадные сбои из-за внешних зависимостей, когда время ответа API критично для UX, или когда нужно масштабировать отдельные этапы обработки независимо. Если ваш сервис обрабатывает менее 100 запросов в минуту и не зависит от внешних API — синхронная модель проще и надёжнее.
RabbitMQ — для task queues, когда сообщение нужно обработать ровно один раз и удалить. Kafka — для event streaming, когда нужно хранить историю событий и воспроизводить её. Для 90% бизнес-приложений (заказы, уведомления, обработка) RabbitMQ проще и достаточен.
В RabbitMQ порядок гарантируется в рамках одной очереди с одним consumer. При нескольких consumers используйте consistent hashing exchange — заказы одного клиента всегда пойдут в одну очередь. В Kafka порядок гарантируется внутри partition — используйте customer_id как partition key.
DLQ — это не мусорка, а очередь для ручного разбора. Настройте алерт на любое появление сообщений в DLQ. Типичные причины: невалидные данные (баг в producer), недоступность внешнего сервиса дольше retry-окна, баг в consumer. Разбирайте каждый случай и чините корневую причину.
Да, распределённый трейсинг обязателен. Добавляйте correlation_id (trace_id) в каждое сообщение и прокидывайте его через все очереди. Используйте OpenTelemetry для визуализации полного пути заказа через Jaeger или Grafana Tempo. Без этого дебаг превращается в кошмар.

Нужна помощь с проектом?

Специалисты АйТи Фреш помогут с архитектурой, DevOps, безопасностью и разработкой — 15+ лет опыта

📞 Связаться с нами
#очереди сообщений#rabbitmq#kafka#nats#асинхронная архитектура#dead letter queue#идемпотентность#event sourcing
Комментарии 0

Оставить комментарий

загрузка...