10 миллионов запросов в секунду: архитектура высоконагруженного сервиса

Начальная архитектура и её пределы

Когда «НотифайПро» запускался, архитектура была простой и понятной: Nginx принимает HTTP-запросы, за ним 8 экземпляров Python-приложения на FastAPI, PostgreSQL для хранения, RabbitMQ для очередей, воркеры для отправки. Эта схема прекрасно работала при 50 тысячах RPS.

Проблемы начались при росте до 500K RPS:

  • PostgreSQL стал узким местом. Каждое уведомление создавало INSERT и два UPDATE (статус «отправлено», статус «доставлено»). При 500K RPS это 1.5 миллиона записей в секунду — PostgreSQL на одном сервере это не вывозит.
  • RabbitMQ проседал. При 500K сообщений в секунду очереди раздувались, потребление RAM росло неконтролируемо, отдельные ноды кластера отваливались.
  • FastAPI-воркеры не успевали. Python с GIL при IO-bound задачах работает хорошо, но при 500K RPS нужно 60+ экземпляров, и координация между ними становится проблемой.
  • Однотипные уведомления дублировались. При повторных запросах от клиентов (retry при таймауте) одно и то же уведомление отправлялось 2-3 раза.

Новая архитектура: обзор

Мы перепроектировали систему с нуля, сохранив API совместимость. Новая архитектура состоит из шести слоёв:

  1. Edge Layer — CDN + rate limiter на уровне Nginx.
  2. API Layer — stateless Go-сервисы, принимающие запросы и помещающие их в очередь.
  3. Queue Layer — Apache Kafka для буферизации и распределения.
  4. Processing Layer — Go-воркеры, обрабатывающие уведомления.
  5. Cache Layer — Redis для дедупликации, шаблонов, rate limiting.
  6. Storage Layer — PostgreSQL с шардингом для истории, ClickHouse для аналитики.

Edge: CDN и кэширование

Первый слой защиты — CDN. Хотя для API-сервиса CDN не кэширует ответы (они уникальны), он выполняет три функции: TLS termination (снимаем нагрузку шифрования с бэкенда), DDoS protection, географическая близость к клиентам (PoP в 12 городах).

Для статических ресурсов (шаблоны email, изображения) CDN кэширует всё. Это сняло 30% нагрузки с бэкенда.

API Layer: приём запросов на Go

Мы переписали API слой с Python на Go. Не потому что «Go быстрее Python» (это упрощение), а потому что Go-сервис потребляет 50 MB RAM и обрабатывает 80K RPS на одном ядре, тогда как FastAPI — 200 MB RAM и 3K RPS на ядро.

При нашей нагрузке это разница между 120 подами и 12 подами.

// Обработчик приёма уведомлений
func (h *Handler) SendNotification(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()

    var req NotificationRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        respondError(w, http.StatusBadRequest, "invalid request body")
        return
    }

    // Валидация
    if err := h.validator.Validate(req); err != nil {
        respondError(w, http.StatusUnprocessableEntity, err.Error())
        return
    }

    // Дедупликация через Redis
    deduplicationKey := fmt.Sprintf("dedup:%s:%s", req.ClientID, req.IdempotencyKey)
    exists, err := h.redis.SetNX(ctx, deduplicationKey, "1", 24*time.Hour).Result()
    if err != nil {
        h.logger.Error("redis dedup check failed", "error", err)
        // При ошибке Redis — пропускаем дедупликацию, лучше дубль, чем потеря
    } else if !exists {
        // Уже обработано — возвращаем сохранённый результат
        respondJSON(w, http.StatusOK, map[string]string{
            "status":  "duplicate",
            "message": "notification already accepted",
        })
        return
    }

    // Генерируем ID и помещаем в Kafka
    notificationID := ulid.Make().String()
    msg := &kafka.Message{
        Topic: "notifications.incoming",
        Key:   []byte(req.ClientID),  // партицирование по клиенту
        Value: mustMarshal(NotificationEvent{
            ID:        notificationID,
            Request:   req,
            CreatedAt: time.Now().UTC(),
        }),
        Headers: []kafka.Header{
            {Key: "priority", Value: []byte(req.Priority)},
        },
    }

    if err := h.kafkaWriter.WriteMessages(ctx, *msg); err != nil {
        h.logger.Error("kafka write failed", "error", err)
        // Удаляем ключ дедупликации, чтобы клиент мог повторить
        h.redis.Del(ctx, deduplicationKey)
        respondError(w, http.StatusServiceUnavailable, "service temporarily unavailable")
        return
    }

    respondJSON(w, http.StatusAccepted, map[string]string{
        "id":     notificationID,
        "status": "accepted",
    })
}

Ключевой принцип: API Layer не обрабатывает уведомления, а только принимает и кладёт в очередь. Ответ клиенту — 202 Accepted с ID уведомления. Дальнейший статус доступен через webhook или polling endpoint.

Миграция с RabbitMQ на Kafka

RabbitMQ отлично работает при умеренных нагрузках, но при миллионах сообщений в секунду у него начинаются проблемы с управлением памятью и репликацией. Kafka проектировался для таких объёмов.

Наша Kafka-инфраструктура:

  • 9 брокеров (3 в каждой зоне доступности)
  • Replication factor: 3
  • Топик notifications.incoming: 120 партиций
  • Топик notifications.priority: 30 партиций (для срочных)
  • Топик notifications.dlq: 10 партиций (dead letter queue)
  • Retention: 72 часа (позволяет перечитать при проблемах)
# kafka-topics.sh конфигурация
kafka-topics.sh --create \
  --topic notifications.incoming \
  --partitions 120 \
  --replication-factor 3 \
  --config retention.ms=259200000 \
  --config segment.bytes=1073741824 \
  --config compression.type=lz4 \
  --config min.insync.replicas=2

Партицирование по ClientID гарантирует, что все уведомления одного клиента попадают в одну партицию и обрабатываются последовательно. Это важно для rate limiting на стороне клиента — мы не хотим превысить лимиты их SMTP-сервера или SMS-шлюза.

Миграцию с RabbitMQ на Kafka мы провели постепенно: сначала оба работали параллельно (dual write), потом переключили consumer-ов на Kafka, убедились в стабильности, и только потом отключили RabbitMQ.

Redis: три уровня кэширования

Redis в нашей архитектуре выполняет три разных функции, для каждой из которых свой кластер:

1. Кэш дедупликации

Redis Cluster из 6 нод (3 master + 3 replica). Хранит idempotency keys за последние 24 часа. При 10M RPS это ~200 миллионов ключей, каждый по ~100 байт. Суммарно ~20 GB RAM.

2. Кэш шаблонов

Standalone Redis, 16 GB RAM. Шаблоны уведомлений (email HTML, SMS-тексты) запрашиваются при каждой отправке. Без кэша — обращение к PostgreSQL, с кэшем — 0.1 мс вместо 2 мс.

func (s *TemplateService) GetTemplate(ctx context.Context, templateID string, locale string) (*Template, error) {
    cacheKey := fmt.Sprintf("tmpl:%s:%s", templateID, locale)

    // Пробуем кэш
    cached, err := s.redis.Get(ctx, cacheKey).Bytes()
    if err == nil {
        var tmpl Template
        if err := json.Unmarshal(cached, &tmpl); err == nil {
            return &tmpl, nil
        }
    }

    // Cache miss — идём в БД
    tmpl, err := s.db.GetTemplate(ctx, templateID, locale)
    if err != nil {
        return nil, err
    }

    // Сохраняем в кэш на 5 минут
    data, _ := json.Marshal(tmpl)
    s.redis.Set(ctx, cacheKey, data, 5*time.Minute)

    return tmpl, nil
}

3. Rate limiter

Redis Cluster из 6 нод. Хранит счётчики запросов по клиентам. Алгоритм — sliding window log:

-- Lua-скрипт для sliding window rate limiting
-- Выполняется атомарно на Redis
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- Удаляем записи за пределами окна
redis.call('ZREMRANGEBYSCORE', key, '-inf', now - window)

-- Считаем текущее количество
local count = redis.call('ZCARD', key)

if count < limit then
    -- Добавляем текущий запрос
    redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
    redis.call('EXPIRE', key, window)
    return 1  -- разрешено
else
    return 0  -- отклонено
end

Каждый клиент имеет индивидуальные лимиты: от 100 до 50 000 RPS в зависимости от тарифа. Rate limiter срабатывает на уровне API Layer, до записи в Kafka, чтобы не засорять очередь.

PostgreSQL: шардинг и read replicas

Для хранения истории уведомлений PostgreSQL шардируется по client_id. Мы используем Citus для прозрачного шардинга:

-- Координатор Citus
SELECT create_distributed_table('notifications', 'client_id');

-- 32 шарда на 4 физических сервера
-- Каждый шард — от 800 GB до 1.2 TB данных

-- Пример запроса — Citus автоматически маршрутизирует к нужному шарду
SELECT id, channel, status, sent_at
FROM notifications
WHERE client_id = 'abc-123'
  AND created_at >= now() - interval '7 days'
ORDER BY created_at DESC
LIMIT 50;

Для каждого шарда настроено по 2 read replica. Запросы на чтение (статус уведомления, история, отчёты) идут на реплики, запись — на мастер. Нагрузка на мастер снизилась на 70%.

Для аналитических запросов (агрегация по миллиардам строк) мы вынесли данные в ClickHouse. Kafka Connect стримит каждое уведомление из Kafka в ClickHouse с задержкой в 5-10 секунд. Запрос «сколько уведомлений отправлено за последний час с группировкой по клиенту и каналу» выполняется за 200 мс по 3 миллиардам строк.

Graceful Degradation

При нагрузке в 10M RPS отказ любого компонента — вопрос времени. Мы спроектировали систему так, чтобы она деградировала плавно, а не падала целиком.

Уровень 1: нормальная работа. Все компоненты доступны. Уведомления обрабатываются за 50-200 мс.

Уровень 2: Redis недоступен. Дедупликация отключается (допускаем редкие дубли), шаблоны загружаются из PostgreSQL (медленнее, но работает), rate limiting переключается на in-memory (менее точный, но функционирует).

func (h *Handler) getTemplate(ctx context.Context, id string) (*Template, error) {
    // Пробуем Redis
    tmpl, err := h.templateCache.Get(ctx, id)
    if err == nil {
        return tmpl, nil
    }

    // Redis unavailable — пробуем локальный LRU-кэш
    if cached, ok := h.localCache.Get(id); ok {
        metrics.TemplateLocalCacheHit.Inc()
        return cached.(*Template), nil
    }

    // Идём в PostgreSQL
    metrics.TemplateCacheMiss.Inc()
    tmpl, err = h.db.GetTemplate(ctx, id)
    if err != nil {
        return nil, err
    }

    // Сохраняем в локальный кэш
    h.localCache.Set(id, tmpl, 1*time.Minute)
    return tmpl, nil
}

Уровень 3: PostgreSQL недоступен. Новые уведомления продолжают приниматься (запись в Kafka), но статусы не обновляются. Воркеры накапливают результаты отправки в локальном буфере и записывают в БД при восстановлении.

Уровень 4: Kafka недоступен. API Layer начинает возвращать 503 с заголовком Retry-After. Клиенты повторяют запросы через указанное время. Мы теряем доступность, но не данные.

Каждый уровень деградации автоматически определяется circuit breaker-ами и логируется с соответствующим алертом.

Нагрузочное тестирование с k6

Мы тестируем каждый релиз перед деплоем в продакшен. k6 позволяет писать сценарии на JavaScript и генерировать миллионы запросов с распределённых агентов:

// k6-scripts/send-notification.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('errors');
const sendDuration = new Trend('send_duration');

export const options = {
    scenarios: {
        constant_load: {
            executor: 'constant-arrival-rate',
            rate: 100000,     // 100K RPS
            timeUnit: '1s',
            duration: '10m',
            preAllocatedVUs: 500,
            maxVUs: 2000,
        },
        spike: {
            executor: 'ramping-arrival-rate',
            startRate: 100000,
            timeUnit: '1s',
            stages: [
                { duration: '2m', target: 100000 },  // Baseline
                { duration: '30s', target: 500000 },  // Spike to 500K
                { duration: '3m', target: 500000 },   // Hold
                { duration: '30s', target: 100000 },  // Back to normal
                { duration: '2m', target: 100000 },   // Cool down
            ],
            preAllocatedVUs: 2000,
            maxVUs: 5000,
            startTime: '12m',  // После constant_load
        },
    },
    thresholds: {
        http_req_duration: ['p(95)<100', 'p(99)<500'],
        errors: ['rate<0.01'],  // Меньше 1% ошибок
    },
};

const BASE_URL = __ENV.TARGET_URL || 'https://api-staging.notifypro.ru';
const API_KEY = __ENV.API_KEY;

export default function () {
    const payload = JSON.stringify({
        client_id: `client-${Math.floor(Math.random() * 200)}`,
        channel: ['push', 'sms', 'email', 'telegram'][Math.floor(Math.random() * 4)],
        recipient: `user-${Math.floor(Math.random() * 1000000)}`,
        template_id: `tmpl-${Math.floor(Math.random() * 50)}`,
        priority: Math.random() > 0.9 ? 'high' : 'normal',
        idempotency_key: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
        data: {
            title: 'Test Notification',
            body: 'This is a load test notification',
        },
    });

    const params = {
        headers: {
            'Content-Type': 'application/json',
            'Authorization': `Bearer ${API_KEY}`,
        },
        timeout: '5s',
    };

    const start = Date.now();
    const res = http.post(`${BASE_URL}/api/v1/notifications`, payload, params);
    sendDuration.add(Date.now() - start);

    const success = check(res, {
        'status is 202': (r) => r.status === 202,
        'has notification id': (r) => JSON.parse(r.body).id !== undefined,
    });

    errorRate.add(!success);
}

Тесты запускаются с 10 распределённых агентов (каждый генерирует до 50K RPS). Результаты отправляются в InfluxDB и визуализируются в Grafana. Мы проверяем не только API-слой, но и сквозную доставку: считаем, сколько уведомлений дошло до mock-получателей.

Финальная архитектура

Итоговая система выглядит следующим образом. Клиенты отправляют HTTP-запросы через CDN (12 PoP). CDN проксирует на 4 балансировщика (L4, keepalived). Балансировщики распределяют по 24 экземплярам API Layer (Go). API Layer проверяет rate limits через Redis и пишет в Kafka (9 брокеров, 120 партиций). 60 воркеров читают из Kafka, рендерят шаблоны (Redis cache), отправляют через провайдеров (10 SMS-шлюзов, 5 push-провайдеров, 8 SMTP-серверов) и записывают результат в PostgreSQL (4 шарда Citus). Kafka Connect реплицирует данные в ClickHouse для аналитики. Мониторинг — Prometheus + Grafana + Jaeger.

Итоговые метрики

МетрикаБыло (500K RPS)Стало (10M RPS)
Пиковый RPS (API)500 00012 000 000
P50 латентность API45 мс8 мс
P99 латентность API1200 мс95 мс
Время доставки push2-5 сек200-800 мс
Error rate0.8%0.02%
Дубли уведомлений0.5%0.001%
Стоимость инфраструктуры480 000 руб/мес1 200 000 руб/мес
Стоимость на 1M уведомлений740 руб92 руб

Стоимость инфраструктуры выросла в 2.5 раза, но стоимость на единицу работы упала в 8 раз. При масштабе «НотифайПро» (400 миллионов уведомлений в сутки) это экономит миллионы рублей ежемесячно.

Главные уроки

  • Async everywhere. При таких нагрузках синхронная обработка невозможна. Kafka как буфер между приёмом и обработкой — ключевое архитектурное решение.
  • Идемпотентность обязательна. При миллионах запросов retry и дубли неизбежны. Система должна корректно обрабатывать повторные запросы.
  • Деградируй, а не падай. Graceful degradation важнее, чем 100% доступность каждого компонента. Пользователь лучше получит уведомление с задержкой, чем увидит ошибку.
  • Тестируйте нагрузку регулярно. Нагрузочные тесты не должны быть разовым мероприятием. Мы запускаем их при каждом релизе и раз в неделю на стейджинге, максимально приближенном к проду.
  • Мониторинг — это продукт. У нас 47 дашбордов в Grafana и 120 алертов. Это не избыточность — при 10M RPS любая аномалия должна быть замечена за секунды.

Нужна помощь с проектом?

Специалисты АйТи Фреш помогут с архитектурой, DevOps, безопасностью и разработкой — 15+ лет опыта

📞 Связаться с нами
#api layer#cache layer#degradation#devops#edge#edge layer#fastapi-воркеры не успевали.#graceful
Комментарии 0

Оставить комментарий

загрузка...