10 миллионов запросов в секунду: архитектура высоконагруженного сервиса

10 миллионов запросов в секунду: архитектура высоконагруженного сервиса

Начальная архитектура и её пределы

Когда мы только стартовали с «НотифайПро», наша архитектура была предельно проста, буквально как дважды два: Nginx принимал HTTP-запросы, за ним трудились 8 экземпляров Python-приложения на FastAPI, для хранения мы использовали PostgreSQL, а RabbitMQ отвечал за очереди, ну и, конечно, были воркеры для отправки. И знаете что? До 50 тысяч RPS эта схема работала просто отлично, без сучка и задоринки.

А вот при росте до 500K RPS всё начало сыпаться:

  • PostgreSQL стал узким местом. Каждое уведомление создавало INSERT и два UPDATE (статус «отправлено», статус «доставлено»). При 500K RPS это 1.5 миллиона записей в секунду — PostgreSQL на одном сервере это не вывозит.
  • RabbitMQ проседал. При 500K сообщений в секунду очереди раздувались, потребление RAM росло неконтролируемо, отдельные ноды кластера отваливались.
  • FastAPI-воркеры не успевали. Python с GIL при IO-bound задачах работает хорошо, но при 500K RPS нужно 60+ экземпляров, и координация между ними становится проблемой.
  • Однотипные уведомления дублировались. При повторных запросах от клиентов (retry при таймауте) одно и то же уведомление отправлялось 2-3 раза.

Новая архитектура: обзор

Систему пересобрали с нуля — но API-совместимость сохранили. В новой версии шесть слоёв:

  1. Edge Layer — CDN + rate limiter на уровне Nginx.
  2. API Layer — stateless Go-сервисы, принимающие запросы и помещающие их в очередь.
  3. Queue Layer — Apache Kafka для буферизации и распределения.
  4. Processing Layer — Go-воркеры, обрабатывающие уведомления.
  5. Cache Layer — Redis для дедупликации, шаблонов, rate limiting.
  6. Storage Layer — PostgreSQL с шардингом для истории, ClickHouse для аналитики.

Edge: CDN и кэширование

Первый рубеж обороны — CDN. Ответы API он не кэширует (они у каждого клиента свои), зато тащит на себе три задачи: TLS termination (снимаем шифрование с бэкенда), защиту от DDoS и географическую близость к клиентам — PoP стоят в 12 городах.

А вот статику (шаблоны писем, картинки) CDN кэширует целиком. Одно это сняло 30% нагрузки с бэкенда.

API Layer: приём запросов на Go

API-слой переписали с Python на Go. И дело не в том, что «Go быстрее Python» — это упрощение. Дело в цифрах: Go-сервис ест 50 MB RAM и тянет 80K RPS на одном ядре, а FastAPI — 200 MB RAM и всего 3K RPS на ядро.

На нашей нагрузке это разница между 120 подами и 12. Есть о чём задуматься.

// Обработчик приёма уведомлений
func (h *Handler) SendNotification(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()

    var req NotificationRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        respondError(w, http.StatusBadRequest, "invalid request body")
        return
    }

    // Валидация
    if err := h.validator.Validate(req); err != nil {
        respondError(w, http.StatusUnprocessableEntity, err.Error())
        return
    }

    // Дедупликация через Redis
    deduplicationKey := fmt.Sprintf("dedup:%s:%s", req.ClientID, req.IdempotencyKey)
    exists, err := h.redis.SetNX(ctx, deduplicationKey, "1", 24*time.Hour).Result()
    if err != nil {
        h.logger.Error("redis dedup check failed", "error", err)
        // При ошибке Redis — пропускаем дедупликацию, лучше дубль, чем потеря
    } else if !exists {
        // Уже обработано — возвращаем сохранённый результат
        respondJSON(w, http.StatusOK, map[string]string{
            "status":  "duplicate",
            "message": "notification already accepted",
        })
        return
    }

    // Генерируем ID и помещаем в Kafka
    notificationID := ulid.Make().String()
    msg := &kafka.Message{
        Topic: "notifications.incoming",
        Key:   []byte(req.ClientID),  // партицирование по клиенту
        Value: mustMarshal(NotificationEvent{
            ID:        notificationID,
            Request:   req,
            CreatedAt: time.Now().UTC(),
        }),
        Headers: []kafka.Header{
            {Key: "priority", Value: []byte(req.Priority)},
        },
    }

    if err := h.kafkaWriter.WriteMessages(ctx, *msg); err != nil {
        h.logger.Error("kafka write failed", "error", err)
        // Удаляем ключ дедупликации, чтобы клиент мог повторить
        h.redis.Del(ctx, deduplicationKey)
        respondError(w, http.StatusServiceUnavailable, "service temporarily unavailable")
        return
    }

    respondJSON(w, http.StatusAccepted, map[string]string{
        "id":     notificationID,
        "status": "accepted",
    })
}

Ключевой принцип: API Layer не обрабатывает уведомления, а только принимает и кладёт в очередь. Ответ клиенту — 202 Accepted с ID уведомления. Дальнейший статус доступен через webhook или polling endpoint.

Миграция с RabbitMQ на Kafka

RabbitMQ хорош при умеренной нагрузке, но на миллионах сообщений в секунду у него начинаются болячки — память и репликация. Kafka же изначально затачивалась ровно под такие объёмы.

Как устроена наша Kafka-инфраструктура:

  • 9 брокеров (3 в каждой зоне доступности)
  • Replication factor: 3
  • Топик notifications.incoming: 120 партиций
  • Топик notifications.priority: 30 партиций (для срочных)
  • Топик notifications.dlq: 10 партиций (dead letter queue)
  • Retention: 72 часа (позволяет перечитать при проблемах)
# kafka-topics.sh конфигурация
kafka-topics.sh --create \
  --topic notifications.incoming \
  --partitions 120 \
  --replication-factor 3 \
  --config retention.ms=259200000 \
  --config segment.bytes=1073741824 \
  --config compression.type=lz4 \
  --config min.insync.replicas=2

Партицирование по ClientID гарантирует, что все уведомления одного клиента попадают в одну партицию и обрабатываются последовательно. Это важно для rate limiting на стороне клиента — мы не хотим превысить лимиты их SMTP-сервера или SMS-шлюза.

К миграции с RabbitMQ на Kafka мы подошли очень аккуратно, проводили её поэтапно, в несколько шагов. Сначала мы запустили обе системы параллельно, используя механизм dual write. Затем перевели всех consumer-ов уже на Kafka, удостоверились, что всё работает как часы и система полностью стабильна, и лишь потом, убедившись в надежности, выключили RabbitMQ.

Redis: три уровня кэширования

Redis у нас закрывает сразу три разные задачи — и под каждую поднят отдельный кластер:

1. Кэш дедупликации

Один из них — это Redis Cluster, состоящий из 6 нод (3 master + 3 replica). Он у нас хранит idempotency keys за последние сутки. При нагрузке в 10M RPS это вам не шутки: получается примерно 200 миллионов ключей, каждый из которых весит около ~100 байт — в сумме это около 20 GB RAM.

2. Кэш шаблонов

Standalone Redis на 16 GB RAM. Шаблоны уведомлений (HTML писем, тексты SMS) дёргаются при каждой отправке. Без кэша это поход в PostgreSQL, с кэшем — 0.1 мс вместо 2 мс.

func (s *TemplateService) GetTemplate(ctx context.Context, templateID string, locale string) (*Template, error) {
    cacheKey := fmt.Sprintf("tmpl:%s:%s", templateID, locale)

    // Пробуем кэш
    cached, err := s.redis.Get(ctx, cacheKey).Bytes()
    if err == nil {
        var tmpl Template
        if err := json.Unmarshal(cached, &tmpl); err == nil {
            return &tmpl, nil
        }
    }

    // Cache miss — идём в БД
    tmpl, err := s.db.GetTemplate(ctx, templateID, locale)
    if err != nil {
        return nil, err
    }

    // Сохраняем в кэш на 5 минут
    data, _ := json.Marshal(tmpl)
    s.redis.Set(ctx, cacheKey, data, 5*time.Minute)

    return tmpl, nil
}

3. Rate limiter

Redis Cluster из 6 нод. Хранит счётчики запросов по клиентам. Алгоритм — sliding window log:

-- Lua-скрипт для sliding window rate limiting
-- Выполняется атомарно на Redis
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- Удаляем записи за пределами окна
redis.call('ZREMRANGEBYSCORE', key, '-inf', now - window)

-- Считаем текущее количество
local count = redis.call('ZCARD', key)

if count < limit then
    -- Добавляем текущий запрос
    redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
    redis.call('EXPIRE', key, window)
    return 1  -- разрешено
else
    return 0  -- отклонено
end

Каждый клиент имеет индивидуальные лимиты: от 100 до 50 000 RPS в зависимости от тарифа. Rate limiter срабатывает на уровне API Layer, до записи в Kafka, чтобы не засорять очередь.

PostgreSQL: шардинг и read replicas

Историю уведомлений в PostgreSQL шардируем по client_id. Прозрачный шардинг нам даёт Citus:

-- Координатор Citus
SELECT create_distributed_table('notifications', 'client_id');

-- 32 шарда на 4 физических сервера
-- Каждый шард — от 800 GB до 1.2 TB данных

-- Пример запроса — Citus автоматически маршрутизирует к нужному шарду
SELECT id, channel, status, sent_at
FROM notifications
WHERE client_id = 'abc-123'
  AND created_at >= now() - interval '7 days'
ORDER BY created_at DESC
LIMIT 50;

Для каждого шарда настроено по 2 read replica. Запросы на чтение (статус уведомления, история, отчёты) идут на реплики, запись — на мастер. Нагрузка на мастер снизилась на 70%.

А вот для аналитики, где нужны агрегации по миллиардам строк, мы решили перенести данные в ClickHouse. Kafka Connect передает туда каждое уведомление из Kafka с небольшой задержкой всего в 5-10 секунд. И в результате, когда мы делаем запрос типа «сколько уведомлений отправлено за последний час с группировкой по клиенту и каналу», он выполняется всего за 200 мс, обрабатывая при этом 3 миллиарда строк! Это просто поразительно.

Graceful Degradation

Когда у тебя 10M RPS, отказ любого компонента — это, к сожалению, вопрос не «если», а «когда». Именно поэтому мы спроектировали нашу систему таким образом, чтобы она могла плавно деградировать, а не рушилась вся сразу при малейшей проблеме.

Уровень 1 — нормальная работа. Все компоненты на месте, уведомления обрабатываются за 50-200 мс.

Уровень 2 — отвалился Redis. Дедупликацию отключаем (смиряемся с редкими дублями), шаблоны тянем уже из PostgreSQL (медленнее, но живо), а rate limiting переезжает на in-memory — точность падает, но он работает.

func (h *Handler) getTemplate(ctx context.Context, id string) (*Template, error) {
    // Пробуем Redis
    tmpl, err := h.templateCache.Get(ctx, id)
    if err == nil {
        return tmpl, nil
    }

    // Redis unavailable — пробуем локальный LRU-кэш
    if cached, ok := h.localCache.Get(id); ok {
        metrics.TemplateLocalCacheHit.Inc()
        return cached.(*Template), nil
    }

    // Идём в PostgreSQL
    metrics.TemplateCacheMiss.Inc()
    tmpl, err = h.db.GetTemplate(ctx, id)
    if err != nil {
        return nil, err
    }

    // Сохраняем в локальный кэш
    h.localCache.Set(id, tmpl, 1*time.Minute)
    return tmpl, nil
}

Уровень 3: PostgreSQL недоступен. Новые уведомления продолжают приниматься (запись в Kafka), но статусы не обновляются. Воркеры накапливают результаты отправки в локальном буфере и записывают в БД при восстановлении.

Уровень 4 — недоступна Kafka. Тут API Layer начинает отдавать 503 с заголовком Retry-After, и клиенты повторяют запрос через указанное время. Доступность мы теряем — но не данные.

Каждый из этих уровней деградации circuit breaker-ы ловят сами, автоматически, и пишут об этом алерт.

Нагрузочное тестирование с k6

Мы всегда прогоняем каждый релиз под серьезной нагрузкой, еще до того, как он попадет на продакшен. А k6 нам очень помогает, ведь его сценарии пишутся на JavaScript, а запросы генерируются миллионами с распределённых агентов — это очень удобно и эффективно:

// k6-scripts/send-notification.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('errors');
const sendDuration = new Trend('send_duration');

export const options = {
    scenarios: {
        constant_load: {
            executor: 'constant-arrival-rate',
            rate: 100000,     // 100K RPS
            timeUnit: '1s',
            duration: '10m',
            preAllocatedVUs: 500,
            maxVUs: 2000,
        },
        spike: {
            executor: 'ramping-arrival-rate',
            startRate: 100000,
            timeUnit: '1s',
            stages: [
                { duration: '2m', target: 100000 },  // Baseline
                { duration: '30s', target: 500000 },  // Spike to 500K
                { duration: '3m', target: 500000 },   // Hold
                { duration: '30s', target: 100000 },  // Back to normal
                { duration: '2m', target: 100000 },   // Cool down
            ],
            preAllocatedVUs: 2000,
            maxVUs: 5000,
            startTime: '12m',  // После constant_load
        },
    },
    thresholds: {
        http_req_duration: ['p(95)<100', 'p(99)<500'],
        errors: ['rate<0.01'],  // Меньше 1% ошибок
    },
};

const BASE_URL = __ENV.TARGET_URL || 'https://api-staging.notifypro.ru';
const API_KEY = __ENV.API_KEY;

export default function () {
    const payload = JSON.stringify({
        client_id: `client-${Math.floor(Math.random() * 200)}`,
        channel: ['push', 'sms', 'email', 'telegram'][Math.floor(Math.random() * 4)],
        recipient: `user-${Math.floor(Math.random() * 1000000)}`,
        template_id: `tmpl-${Math.floor(Math.random() * 50)}`,
        priority: Math.random() > 0.9 ? 'high' : 'normal',
        idempotency_key: `${Date.now()}-${Math.random().toString(36).slice(2)}`,
        data: {
            title: 'Test Notification',
            body: 'This is a load test notification',
        },
    });

    const params = {
        headers: {
            'Content-Type': 'application/json',
            'Authorization': `Bearer ${API_KEY}`,
        },
        timeout: '5s',
    };

    const start = Date.now();
    const res = http.post(`${BASE_URL}/api/v1/notifications`, payload, params);
    sendDuration.add(Date.now() - start);

    const success = check(res, {
        'status is 202': (r) => r.status === 202,
        'has notification id': (r) => JSON.parse(r.body).id !== undefined,
    });

    errorRate.add(!success);
}

Тесты запускаются с 10 распределённых агентов (каждый генерирует до 50K RPS). Результаты отправляются в InfluxDB и визуализируются в Grafana. Мы проверяем не только API-слой, но и сквозную доставку: считаем, сколько уведомлений дошло до mock-получателей.

Финальная архитектура

Итак, давайте посмотрим, как же выглядит наша финальная система, когда все собрано воедино. Наш клиент отправляет HTTP-запрос, который проходит через CDN (у нас 12 PoP). CDN, в свою очередь, проксирует его на 4 балансировщика (L4, keepalived). А они уже распределяют нагрузку по 24 экземплярам API Layer, написанным на Go. API Layer сверяется с rate limits через Redis и записывает все в Kafka (где 9 брокеров и 120 партиций). Дальше в дело вступают 60 воркеров: они читают из Kafka, рендерят шаблоны (используя Redis cache), отправляют уведомления через целую плеяду провайдеров — это 10 SMS-шлюзов, 5 push-провайдеров и 8 SMTP-серверов — и, наконец, сохраняют результат в PostgreSQL (который разбит на 4 шарда Citus). Параллельно с этим Kafka Connect перекачивает данные в ClickHouse специально для аналитики. А за всей этой махиной зорко следят наши верные помощники: Prometheus, Grafana и Jaeger.

Итоговые метрики

МетрикаБыло (500K RPS)Стало (10M RPS)
Пиковый RPS (API)500 00012 000 000
P50 латентность API45 мс8 мс
P99 латентность API1200 мс95 мс
Время доставки push2-5 сек200-800 мс
Error rate0.8%0.02%
Дубли уведомлений0.5%0.001%
Стоимость инфраструктуры480 000 руб/мес1 200 000 руб/мес
Стоимость на 1M уведомлений740 руб92 руб

Не скрою, наша инфраструктура стала дороже в 2.5 раза. Но зато стоимость выполнения одной единицы работы при этом уменьшилась аж в 8 раз! Если говорить о масштабах «НотифайПро» — а это, на минуточку, 400 миллионов уведомлений в сутки — такая экономия выливается в миллионы рублей каждый месяц. Я считаю, это того стоило!

Главные уроки

  • Async everywhere. При таких нагрузках синхронная обработка невозможна. Kafka как буфер между приёмом и обработкой — ключевое архитектурное решение.
  • Идемпотентность обязательна. При миллионах запросов retry и дубли неизбежны. Система должна корректно обрабатывать повторные запросы.
  • Деградируй, а не падай. Graceful degradation важнее, чем 100% доступность каждого компонента. Пользователь лучше получит уведомление с задержкой, чем увидит ошибку.
  • Тестируйте нагрузку регулярно. Нагрузочные тесты не должны быть разовым мероприятием. Мы запускаем их при каждом релизе и раз в неделю на стейджинге, максимально приближенном к проду.
  • Мониторинг — это продукт. У нас 47 дашбордов в Grafana и 120 алертов. Это не избыточность — при 10M RPS любая аномалия должна быть замечена за секунды.

Нужна помощь с проектом?

Специалисты АйТи Фреш помогут с архитектурой, DevOps, безопасностью и разработкой — 15+ лет опыта

📞 Связаться с нами
#api layer#cache layer#degradation#devops#edge#edge layer#fastapi-воркеры не успевали.#graceful
Комментарии 0

Оставить комментарий

загрузка...
📄
Скачайте подробный разбор в PDF Кейсы, статистика, типовые ошибки и чек-лист самопроверки — 12 страниц
Скачать PDF

Подпишитесь на рассылку ITfresh

Раз в неделю — практические гайды для руководителя IT и сисадмина: безопасность, 1С, миграции, резервные копии, лайфхаки из реальных проектов.

Реквизиты оператора персональных данных

ООО «АЙТИ-ФРЕШ», ИНН 7719418495, КПП 771901001. Юридический адрес: 105523, г. Москва, Щёлковское шоссе, д. 92, корп. 7. Контакт: info@itfresh.ru, +7 903 729-62-41. Оператор обрабатывает e-mail подписчика в целях рассылки информационных и рекламных материалов до момента отзыва согласия.