Apache Kafka: как мы построили потоковую обработку на 1 миллиард событий в день

Почему Kafka, а не альтернативы

До миграции «АналитикПро» использовал RabbitMQ для очередей и Redis Streams для real-time подсчётов. При 200 млн событий/день система работала, но рост до 1 млрд обнажил проблемы:

КритерийRabbitMQRedis StreamsKafka
Пропускная способность50K msg/s200K msg/s2M+ msg/s
Хранение данныхТолько до ACKОграничено RAMДиск, недели/месяцы
Replay событийНетОграниченноПолный replay
Consumer groupsОграниченноДаДа, масштабируемо
Exactly-onceНетНетДа (с транзакциями)
Stream processingНетБазовыйKafka Streams / ksqlDB

Ключевой фактор — возможность replay. В аналитике часто нужно пересчитать агрегации за прошлые периоды (новая метрика, исправление бага, изменение логики атрибуции). С RabbitMQ данные теряются после обработки. Kafka хранит сырые события столько, сколько настроено — в нашем случае 30 дней.

Кластер: sizing и конфигурация

Расчёт размера кластера начинается с арифметики:

  • 1 млрд событий/день = ~12 000 msg/s в среднем, до 40 000 msg/s в пике
  • Средний размер сообщения: 800 байт (сжатое) / 2 КБ (несжатое)
  • Суточный объём: ~800 ГБ несжатых данных / ~320 ГБ с lz4
  • Retention 30 дней: ~9.6 ТБ сжатых данных
  • Replication factor 3: ~29 ТБ общего хранилища

Итоговая конфигурация:

# 5 брокеров, каждый:
# - 8 vCPU (Kafka активно использует сеть и I/O, не CPU)
# - 32 GB RAM (OS page cache критичен для производительности)
# - 4x 2TB NVMe SSD в JBOD (не RAID — Kafka сам реплицирует)
# - 10 Gbps сеть

# server.properties — ключевые настройки
broker.id=1
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
log.dirs=/data/kafka-0,/data/kafka-1,/data/kafka-2,/data/kafka-3

# Replication
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

# Performance
num.partitions=12
log.segment.bytes=1073741824
log.retention.hours=720  # 30 дней
log.retention.check.interval.ms=300000

# Compression — делегируем продюсеру
compression.type=producer

# ZooKeeper → KRaft (без ZooKeeper)
process.roles=broker,controller
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093

Топики и партиционирование

Дизайн топиков — одно из важнейших решений. Мы разделили поток на уровни обработки:

# Сырые события от трекеров
events.raw              # 48 партиций, retention 30d

# После валидации и обогащения
events.validated        # 48 партиций, retention 7d

# Агрегированные метрики (compacted)
metrics.realtime        # 24 партиции, compacted

# Dead Letter Queue для невалидных событий
events.dlq              # 12 партиций, retention 90d

# Changelog для Kafka Streams state stores
streams-app-KSTREAM-AGGREGATE-changelog  # auto-created

Почему 48 партиций для raw? Это количество определяет максимальный параллелизм потребителей. У нас 12 инстансов consumer-приложения, каждый обрабатывает 4 партиции. При росте добавляем инстансы без остановки.

Ключ партиционирования — site_id. Это гарантирует, что все события одного сайта попадают в одну партицию и обрабатываются одним consumer-ом, сохраняя порядок.

Producer: тюнинг для максимальной пропускной способности

Трекер-сервис (Go) отправляет события в Kafka. Ключевые настройки producer-а:

// producer/config.go
package producer

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

func NewProducer() (*kafka.Producer, error) {
    return kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",

        // Batching: собираем сообщения пачками
        "batch.size":        1048576,   // 1 МБ макс размер батча
        "linger.ms":         50,        // Ждём до 50мс для наполнения батча
        "buffer.memory":     134217728, // 128 МБ буфер

        // Сжатие: lz4 — лучший баланс скорости и степени сжатия
        "compression.type":  "lz4",
        "compression.level": 4,

        // Надёжность: acks=all для гарантии записи
        "acks":              "all",
        "retries":           2147483647, // Бесконечные ретраи
        "max.in.flight.requests.per.connection": 5, // Идемпотентность
        "enable.idempotence": true,

        // Мониторинг
        "statistics.interval.ms": 30000,
    })
}

// Отправка события
func (p *EventProducer) Send(event *AnalyticsEvent) error {
    value, err := p.serializer.Serialize("events.raw", event)
    if err != nil {
        return fmt.Errorf("serialization error: %w", err)
    }

    return p.producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &topicRaw,
            Partition: kafka.PartitionAny, // Partitioner по ключу
        },
        Key:   []byte(event.SiteID),
        Value: value,
        Headers: []kafka.Header{
            {Key: "event_type", Value: []byte(event.Type)},
            {Key: "schema_version", Value: []byte("3")},
        },
    }, nil)
}

Результат тюнинга: один инстанс producer-а стабильно отдаёт 120 000 msg/s при latency p99 < 15 мс. Три инстанса с запасом закрывают пиковые 40 000 msg/s (каждый работает на 30% от максимума).

Consumer: паттерны и подводные камни

Consumer-приложение на Python (data engineering команда работает на Python) читает из events.raw, валидирует, обогащает и пишет в events.validated:

# consumer/event_processor.py
from confluent_kafka import Consumer, Producer, KafkaError
from confluent_kafka.serialization import SerializationContext, MessageField
from schema_registry.client import SchemaRegistryClient
import json

class EventProcessor:
    def __init__(self):
        self.consumer = Consumer({
            'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
            'group.id': 'event-validator',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,  # Manual commit после обработки
            'max.poll.interval.ms': 300000,
            'session.timeout.ms': 30000,
            'fetch.min.bytes': 50000,       # Ждём набора батча
            'fetch.max.wait.ms': 500,
            'max.partition.fetch.bytes': 1048576,
        })
        self.producer = Producer({
            'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
            'acks': 'all',
            'enable.idempotence': True,
            'compression.type': 'lz4',
        })
        self.geo_db = GeoIPDatabase('/data/GeoLite2-City.mmdb')
        self.ua_parser = UserAgentParser()

    def process_batch(self, messages):
        """Обработка батча сообщений."""
        for msg in messages:
            try:
                event = self.deserialize(msg)

                # Валидация
                if not self.validate(event):
                    self.send_to_dlq(msg, reason='validation_failed')
                    continue

                # Обогащение
                event['geo'] = self.geo_db.lookup(event['ip'])
                event['device'] = self.ua_parser.parse(event['user_agent'])
                del event['ip']       # GDPR: не храним IP после обогащения
                del event['user_agent']

                # Отправка в validated топик
                self.producer.produce(
                    'events.validated',
                    key=event['site_id'],
                    value=json.dumps(event).encode(),
                )
            except Exception as e:
                self.send_to_dlq(msg, reason=str(e))

        # Flush producer и commit offsets атомарно
        self.producer.flush()
        self.consumer.commit(asynchronous=False)

    def run(self):
        self.consumer.subscribe(['events.raw'])
        while True:
            messages = self.consumer.consume(
                num_messages=500,  # Батч до 500 сообщений
                timeout=1.0,
            )
            if messages:
                valid = [m for m in messages if m.error() is None]
                errors = [m for m in messages if m.error() is not None]
                for e in errors:
                    if e.error().code() != KafkaError._PARTITION_EOF:
                        logger.error(f"Consumer error: {e.error()}")
                if valid:
                    self.process_batch(valid)

Exactly-Once: мифы и реальность

«Exactly-once» в Kafka — не магия. Это работает только в рамках read-process-write паттерна внутри Kafka (то есть чтение из одного топика и запись в другой). Для внешних систем (БД, S3) нужна идемпотентность на стороне потребителя.

Мы используем transactional API для pipe-линов внутри Kafka:

# Transactional producer для exactly-once
producer = Producer({
    'bootstrap.servers': 'kafka-1:9092',
    'transactional.id': 'event-processor-0',  # Уникальный ID
    'enable.idempotence': True,
    'acks': 'all',
})

producer.init_transactions()

try:
    producer.begin_transaction()

    # Обработка и запись
    for event in batch:
        enriched = enrich(event)
        producer.produce('events.validated', value=enriched)

    # Commit offsets в рамках той же транзакции
    producer.send_offsets_to_transaction(
        consumer.position(consumer.assignment()),
        consumer.consumer_group_metadata(),
    )

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise

Kafka Streams: агрегации в реальном времени

Для real-time дашбордов мы используем Kafka Streams (Java). Приложение считает метрики по каждому сайту в скользящих окнах:

// streams/RealtimeAggregator.java
public class RealtimeAggregator {
    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, ValidatedEvent> events = builder
            .stream("events.validated",
                Consumed.with(Serdes.String(), eventSerde));

        // Подсчёт событий по типу в 1-минутных окнах
        events
            .groupBy((key, event) ->
                event.getSiteId() + "|" + event.getType(),
                Grouped.with(Serdes.String(), eventSerde))
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(Materialized.as("event-counts"))
            .toStream()
            .map((windowedKey, count) -> {
                String[] parts = windowedKey.key().split("\\|");
                MetricRecord metric = new MetricRecord(
                    parts[0],  // site_id
                    parts[1],  // event_type
                    windowedKey.window().start(),
                    count
                );
                return KeyValue.pair(parts[0], metric);
            })
            .to("metrics.realtime",
                Produced.with(Serdes.String(), metricSerde));

        // Подсчёт уникальных пользователей (HyperLogLog)
        events
            .groupByKey()
            .aggregate(
                HyperLogLog::new,
                (key, event, hll) -> {
                    hll.add(event.getVisitorId());
                    return hll;
                },
                Materialized.<String, HyperLogLog, KeyValueStore<Bytes, byte[]>>as(
                    "unique-visitors"
                ).withValueSerde(hllSerde)
            );

        return builder.build();
    }
}

Schema Registry: эволюция схем

С 40 000 сайтов-источников формат событий неизбежно меняется. Avro + Schema Registry обеспечивают backward/forward совместимость:

// schemas/analytics_event.avsc
{
  "type": "record",
  "name": "AnalyticsEvent",
  "namespace": "pro.analytik.events",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "site_id", "type": "string"},
    {"name": "visitor_id", "type": "string"},
    {"name": "event_type", "type": {
      "type": "enum",
      "name": "EventType",
      "symbols": ["pageview", "click", "scroll", "form_submit", "purchase"]
    }},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "url", "type": "string"},
    {"name": "referrer", "type": ["null", "string"], "default": null},
    {"name": "properties", "type": {"type": "map", "values": "string"},
     "default": {}},
    // v2: добавлено с default — backward compatible
    {"name": "session_id", "type": ["null", "string"], "default": null},
    // v3: добавлено с default — backward compatible
    {"name": "device_category", "type": ["null", "string"], "default": null}
  ]
}

Schema Registry настроен на BACKWARD совместимость: новые версии схемы могут добавлять поля с default-значениями, но не могут удалять обязательные поля. Это позволяет producer-ам обновляться независимо от consumer-ов.

Мониторинг: Burrow + Prometheus

Главная метрика здоровья Kafka — consumer lag (отставание потребителей). Burrow от LinkedIn мониторит lag и оценивает его динамику:

# docker-compose.monitoring.yml
services:
  burrow:
    image: linkedin/burrow:latest
    volumes:
      - ./burrow.toml:/etc/burrow/burrow.toml
    ports:
      - "8000:8000"

  kafka-exporter:
    image: danielqsj/kafka-exporter:latest
    command:
      - --kafka.server=kafka-1:9092
      - --kafka.server=kafka-2:9092
      - --kafka.server=kafka-3:9092
    ports:
      - "9308:9308"

  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
# prometheus alert rules
groups:
  - name: kafka-alerts
    rules:
      - alert: ConsumerLagHigh
        expr: kafka_consumergroup_lag_sum > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer group {{ $labels.consumergroup }} lag > 100K"

      - alert: ConsumerLagCritical
        expr: kafka_consumergroup_lag_sum > 1000000
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Consumer group {{ $labels.consumergroup }} lag > 1M"

      - alert: UnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Under-replicated partitions detected"

      - alert: OfflinePartitions
        expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
        for: 30s
        labels:
          severity: critical
        annotations:
          summary: "Offline partitions — data loss risk!"

Retention и Compaction

Разные топики — разные стратегии хранения:

  • events.raw — time-based retention, 30 дней. Сырые данные нужны для replay и дебага
  • events.validated — 7 дней. После обработки Kafka Streams данные в этом топике менее ценны
  • metrics.realtime — compacted. Хранится только последнее значение для каждого ключа (site_id + metric_type). Размер топика не растёт бесконечно
  • events.dlq — 90 дней. Невалидные события нужно расследовать, иногда ретроспективно
# Настройка compaction для metrics.realtime
kafka-configs.sh --bootstrap-server kafka-1:9092 \
  --entity-type topics \
  --entity-name metrics.realtime \
  --alter \
  --add-config \
    cleanup.policy=compact,\
    min.compaction.lag.ms=3600000,\
    delete.retention.ms=86400000,\
    segment.ms=3600000

Операционные уроки

За год эксплуатации мы набили несколько важных шишек:

  1. Не экономьте на дисках. NVMe SSD — обязательно. Мы начинали с SAS HDD и получали latency spikes при compaction. После миграции на NVMe p99 produce latency упал с 120 мс до 8 мс
  2. Page cache — ваш друг. 32 ГБ RAM на брокер — это не для JVM (heap ставьте 6-8 ГБ), а для OS page cache. Kafka читает с диска через mmap, и page cache превращает disk reads в memory reads
  3. Партиции — не бесплатны. Мы начали с 96 партиций на events.raw «на вырост» и получили медленный rebalance и высокую нагрузку на controller. Уменьшили до 48 — хватает с запасом
  4. Consumer rebalance — главный враг стабильности. Используйте cooperative-sticky assignor вместо eager. Это превращает full-stop rebalance в инкрементальный, время простоя уменьшается с 30 сек до 2 сек
  5. Мониторьте ISR (In-Sync Replicas). Если ISR shrinks — это ранний сигнал проблем с диском или сетью. Не ждите offline partitions

Kafka — сложная система, но при правильном sizing, мониторинге и операционных практиках она стабильно обрабатывает миллиарды событий в день. Главное — не относиться к ней как к «просто очереди». Это распределённый лог, и этот ментальный сдвиг определяет успех внедрения.

Нужна помощь с проектом?

Специалисты АйТи Фреш помогут с архитектурой, DevOps, безопасностью и разработкой — 15+ лет опыта

📞 Связаться с нами
#burrow#compaction#consumer#events.dlq#events.raw#events.validated#exactlyonce#kafka
Комментарии 0

Оставить комментарий

загрузка...