Система уведомлений в реальном времени: от 0 до 5 миллионов пользователей

Архитектура: почему Go для WebSocket-сервера

«СпортЛайв» — платформа для ставок на спорт и просмотра live-результатов. Требования к уведомлениям:

  • Доставка обновления счёта за <100 мс
  • 5 млн одновременных WebSocket-соединений
  • Пиковая нагрузка: 200K сообщений/сек (финал Лиги Чемпионов)
  • Гарантированная доставка: пользователь не должен пропустить изменение ставки

Мы выбрали Go по нескольким причинам: goroutine на каждое соединение обходится в ~4 КБ памяти (vs 1 МБ для потока в Java), отличная поддержка WebSocket через gorilla/websocket, и команда клиента уже писала на Go.

WebSocket-сервер: управление соединениями

// internal/ws/hub.go
package ws

import (
    "sync"
    "time"
    "github.com/gorilla/websocket"
    "go.uber.org/zap"
)

const (
    writeWait      = 10 * time.Second
    pongWait       = 60 * time.Second
    pingPeriod     = (pongWait * 9) / 10
    maxMessageSize = 4096
)

type Client struct {
    ID       string
    UserID   string
    Conn     *websocket.Conn
    Hub      *Hub
    Send     chan []byte
    Subs     map[string]bool // подписки: "match:12345", "league:pl"
    mu       sync.Mutex
}

type Hub struct {
    clients    map[string]*Client
    topics     map[string]map[string]*Client // topic -> clientID -> client
    register   chan *Client
    unregister chan *Client
    broadcast  chan *TopicMessage
    mu         sync.RWMutex
    logger     *zap.Logger
    metrics    *Metrics
}

type TopicMessage struct {
    Topic    string
    Payload  []byte
    Priority int // 0=low, 1=normal, 2=high, 3=critical
}

func NewHub(logger *zap.Logger) *Hub {
    return &Hub{
        clients:    make(map[string]*Client),
        topics:     make(map[string]map[string]*Client),
        register:   make(chan *Client, 256),
        unregister: make(chan *Client, 256),
        broadcast:  make(chan *TopicMessage, 10000),
        logger:     logger,
        metrics:    NewMetrics(),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client.ID] = client
            h.mu.Unlock()
            h.metrics.ConnectionsTotal.Inc()
            h.metrics.ConnectionsCurrent.Set(float64(len(h.clients)))

        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client.ID]; ok {
                delete(h.clients, client.ID)
                close(client.Send)
                // Удаляем из всех топиков
                for topic := range client.Subs {
                    if subs, ok := h.topics[topic]; ok {
                        delete(subs, client.ID)
                    }
                }
            }
            h.mu.Unlock()
            h.metrics.ConnectionsCurrent.Set(float64(len(h.clients)))

        case msg := <-h.broadcast:
            h.fanOut(msg)
        }
    }
}

Heartbeat и обнаружение мёртвых соединений

Мобильные клиенты часто теряют связь без корректного закрытия WebSocket. Без heartbeat мы бы накапливали «мёртвые» соединения. Реализация стандартная — ping/pong:

func (c *Client) readPump() {
    defer func() {
        c.Hub.unregister <- c
        c.Conn.Close()
    }()

    c.Conn.SetReadLimit(maxMessageSize)
    c.Conn.SetReadDeadline(time.Now().Add(pongWait))
    c.Conn.SetPongHandler(func(string) error {
        c.Conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })

    for {
        _, message, err := c.Conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err,
                websocket.CloseGoingAway,
                websocket.CloseNormalClosure) {
                c.Hub.logger.Warn("unexpected close", zap.Error(err))
            }
            break
        }
        c.handleMessage(message)
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.Conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.Send:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
                return
            }
        case <-ticker.C:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

Fan-out: рассылка по топикам

Когда меняется счёт матча, нужно уведомить всех подписчиков этого матча. При 100K подписчиков на финал ЛЧ последовательная отправка заняла бы секунды. Мы используем параллельный fan-out с worker pool:

func (h *Hub) fanOut(msg *TopicMessage) {
    h.mu.RLock()
    subscribers, ok := h.topics[msg.Topic]
    if !ok || len(subscribers) == 0 {
        h.mu.RUnlock()
        return
    }

    // Копируем список подписчиков, чтобы не держать блокировку
    clients := make([]*Client, 0, len(subscribers))
    for _, client := range subscribers {
        clients = append(clients, client)
    }
    h.mu.RUnlock()

    h.metrics.MessagesOut.Add(float64(len(clients)))

    // Параллельная отправка батчами по 1000
    batchSize := 1000
    var wg sync.WaitGroup

    for i := 0; i < len(clients); i += batchSize {
        end := i + batchSize
        if end > len(clients) {
            end = len(clients)
        }

        wg.Add(1)
        go func(batch []*Client) {
            defer wg.Done()
            for _, client := range batch {
                select {
                case client.Send <- msg.Payload:
                    // отправлено
                default:
                    // буфер клиента полон, закрываем медленного клиента
                    h.logger.Warn("slow client, disconnecting",
                        zap.String("client_id", client.ID))
                    h.unregister <- client
                }
            }
        }(clients[i:end])
    }

    wg.Wait()
}

Очередь сообщений с приоритетами

Не все уведомления одинаково важны. Изменение коэффициента ставки критичнее, чем статистика по владению мячом. Мы используем priority queue:

// internal/queue/priority.go
type PriorityQueue struct {
    queues [4]chan *TopicMessage // 0=low, 1=normal, 2=high, 3=critical
}

func NewPriorityQueue() *PriorityQueue {
    return &PriorityQueue{
        queues: [4]chan *TopicMessage{
            make(chan *TopicMessage, 50000),  // low
            make(chan *TopicMessage, 20000),  // normal
            make(chan *TopicMessage, 5000),   // high
            make(chan *TopicMessage, 1000),   // critical
        },
    }
}

func (pq *PriorityQueue) Dequeue() *TopicMessage {
    // Приоритет: critical > high > normal > low
    // Используем select с приоритетом через вложенные проверки
    for {
        select {
        case msg := <-pq.queues[3]:
            return msg
        default:
        }
        select {
        case msg := <-pq.queues[3]:
            return msg
        case msg := <-pq.queues[2]:
            return msg
        default:
        }
        select {
        case msg := <-pq.queues[3]:
            return msg
        case msg := <-pq.queues[2]:
            return msg
        case msg := <-pq.queues[1]:
            return msg
        default:
        }
        select {
        case msg := <-pq.queues[3]:
            return msg
        case msg := <-pq.queues[2]:
            return msg
        case msg := <-pq.queues[1]:
            return msg
        case msg := <-pq.queues[0]:
            return msg
        }
    }
}

Mobile Push: FCM и APNs

WebSocket работает, пока приложение открыто. Когда пользователь свернул приложение, нужны push-нотификации. Мы сделали единый адаптер:

# push_service.py — сервис мобильных пушей
import asyncio
import firebase_admin
from firebase_admin import messaging
from aioapns import APNs, NotificationRequest, PushType

class PushService:
    def __init__(self, config):
        # Firebase для Android
        firebase_admin.initialize_app(firebase_admin.credentials.Certificate(
            config['firebase_credentials_path']
        ))

        # APNs для iOS
        self.apns = APNs(
            key=config['apns_key_path'],
            key_id=config['apns_key_id'],
            team_id=config['apns_team_id'],
            topic=config['apns_bundle_id'],
            use_sandbox=config.get('apns_sandbox', False),
        )

    async def send(self, user_id: str, title: str, body: str,
                   data: dict, tokens: list[dict]):
        tasks = []
        for token_info in tokens:
            if token_info['platform'] == 'android':
                tasks.append(self._send_fcm(token_info['token'], title, body, data))
            elif token_info['platform'] == 'ios':
                tasks.append(self._send_apns(token_info['token'], title, body, data))

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Обработка невалидных токенов
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                await self._handle_failed_token(tokens[i], result)

        return results

    async def _send_fcm(self, token, title, body, data):
        message = messaging.Message(
            notification=messaging.Notification(title=title, body=body),
            data={k: str(v) for k, v in data.items()},
            token=token,
            android=messaging.AndroidConfig(
                priority='high',
                ttl=60,  # 60 секунд TTL для спортивных событий
            ),
        )
        return messaging.send(message)

    async def _send_apns(self, token, title, body, data):
        request = NotificationRequest(
            device_token=token,
            message={
                "aps": {
                    "alert": {"title": title, "body": body},
                    "sound": "score_update.wav",
                    "badge": 1,
                    "mutable-content": 1,
                },
                **data,
            },
            push_type=PushType.ALERT,
            time_to_live=60,
        )
        return await self.apns.send_notification(request)

Email/SMS Fallback

Для критических уведомлений (выигрыш ставки, изменение правил) у нас есть fallback через email и SMS. Если WebSocket и push не доставили сообщение за 30 секунд — отправляем через альтернативные каналы. Статус доставки отслеживаем через ACK от клиента.

Масштабирование: Redis Pub/Sub

Один сервер с 4 ГБ RAM держит ~500K WebSocket-соединений. Для 5 млн нужно 10+ серверов. Проблема: как доставить сообщение клиенту, если он подключён к другому серверу?

Ответ — Redis Pub/Sub как шина между серверами:

// internal/cluster/redis_bus.go
package cluster

import (
    "context"
    "encoding/json"
    "github.com/redis/go-redis/v9"
)

type RedisBus struct {
    client *redis.ClusterClient
    nodeID string
    hub    *ws.Hub
}

func NewRedisBus(addrs []string, nodeID string, hub *ws.Hub) *RedisBus {
    client := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs:        addrs,
        PoolSize:     100,
        MinIdleConns: 20,
    })

    return &RedisBus{client: client, nodeID: nodeID, hub: hub}
}

// Publish отправляет сообщение во все ноды через Redis
func (rb *RedisBus) Publish(ctx context.Context, msg *ws.TopicMessage) error {
    envelope := map[string]interface{}{
        "origin":   rb.nodeID,
        "topic":    msg.Topic,
        "payload":  string(msg.Payload),
        "priority": msg.Priority,
    }
    data, _ := json.Marshal(envelope)
    return rb.client.Publish(ctx, "notifications:broadcast", data).Err()
}

// Subscribe слушает сообщения от других нод
func (rb *RedisBus) Subscribe(ctx context.Context) {
    pubsub := rb.client.Subscribe(ctx, "notifications:broadcast")
    ch := pubsub.Channel()

    for msg := range ch {
        var envelope map[string]interface{}
        json.Unmarshal([]byte(msg.Payload), &envelope)

        // Не обрабатываем свои же сообщения
        if envelope["origin"].(string) == rb.nodeID {
            continue
        }

        topicMsg := &ws.TopicMessage{
            Topic:    envelope["topic"].(string),
            Payload:  []byte(envelope["payload"].(string)),
            Priority: int(envelope["priority"].(float64)),
        }

        rb.hub.BroadcastToLocal(topicMsg)
    }
}

Гарантия доставки: at-least-once

При обрыве соединения клиент может пропустить сообщения. Для гарантии at-least-once мы используем sequence numbers: каждое сообщение имеет монотонный ID, клиент при реконнекте отправляет последний полученный ID, и сервер досылает пропущенные.

Мониторинг и A/B-тестирование

Ключевые метрики, которые мы отслеживаем:

  • Delivery rate: 99.7% сообщений доставлены за <200 мс
  • Connection stability: средняя длительность соединения 47 минут
  • Reconnect rate: 3.2% соединений реконнектятся в минуту
  • Push delivery: FCM 97%, APNs 99.1%

Мы также проводим A/B-тесты стратегий уведомлений: частота, группировка, формулировки — и измеряем влияние на возвращаемость пользователей и конверсию в ставки.

Результаты

МетрикаБыло (polling)Стало (WebSocket)
Задержка доставки5-10 секунд<100 мс
Нагрузка на API50K rps (polling)2K rps (только полезные)
Одновременные пользователи500K (потолок)5M+
Серверы40 API-серверов12 WS-серверов
Возвращаемость пользователей34%52% (+53%)

Real-time уведомления — это не роскошь, а необходимость для продуктов, где важна скорость. Правильная архитектура позволяет масштабироваться линейно, а приоритизация сообщений гарантирует, что критические данные всегда доставляются первыми.

Нужна помощь с проектом?

Специалисты АйТи Фреш помогут с архитектурой, DevOps, безопасностью и разработкой — 15+ лет опыта

📞 Связаться с нами
#abтестирование#apns#atleastonce#connection stability:#delivery rate:#devops#emailsms#fallback
Комментарии 0

Оставить комментарий

загрузка...