Machine Learning в продакшене: от Jupyter-ноутбука до отказоустойчивого API

Проблема: ноутбучный ад

На момент обращения ситуация выглядела так:

  • 47 Jupyter-ноутбуков с моделями, разбросанных по ноутбукам data-учёных
  • Никакого версионирования моделей — «model_final_v3_FIXED_really_final.pkl»
  • Деплой модели — это копирование .pkl файла на сервер и перезапуск Flask-приложения
  • Нет мониторинга качества модели в production
  • Inference на CPU, latency p99 = 2.8 секунды (при SLA 500 мс)
  • «Работает на моём ноутбуке» — классическая проблема с зависимостями

Шаг 1: MLflow для трекинга экспериментов

Первое, что мы внедрили — MLflow. Это решает проблему воспроизводимости и версионирования:

# train_people_counter.py — тренировка модели с MLflow-трекингом
import mlflow
import mlflow.pytorch
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2
from torch.utils.data import DataLoader
from dataset import RetailDataset
from metrics import compute_map

mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("people-counter-v2")

# Гиперпараметры
config = {
    "backbone": "resnet50_fpn_v2",
    "learning_rate": 0.005,
    "batch_size": 8,
    "epochs": 50,
    "image_size": 640,
    "augmentations": ["horizontal_flip", "color_jitter", "random_crop"],
    "optimizer": "SGD",
    "momentum": 0.9,
    "weight_decay": 0.0005,
    "lr_scheduler": "cosine_annealing",
}

with mlflow.start_run(run_name="fasterrcnn-retail-v2.3"):
    # Логируем все гиперпараметры
    mlflow.log_params(config)

    # Логируем информацию о данных
    train_dataset = RetailDataset("s3://retailvision-data/train/", config["image_size"])
    val_dataset = RetailDataset("s3://retailvision-data/val/", config["image_size"])
    mlflow.log_param("train_samples", len(train_dataset))
    mlflow.log_param("val_samples", len(val_dataset))

    model = fasterrcnn_resnet50_fpn_v2(num_classes=2)  # background + person
    optimizer = torch.optim.SGD(
        model.parameters(),
        lr=config["learning_rate"],
        momentum=config["momentum"],
        weight_decay=config["weight_decay"],
    )

    train_loader = DataLoader(train_dataset, batch_size=config["batch_size"],
                              shuffle=True, num_workers=4, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=config["batch_size"],
                            num_workers=4, collate_fn=collate_fn)

    best_map = 0.0
    for epoch in range(config["epochs"]):
        # Training loop
        model.train()
        epoch_loss = 0
        for images, targets in train_loader:
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()
            epoch_loss += losses.item()

        avg_loss = epoch_loss / len(train_loader)
        mlflow.log_metric("train_loss", avg_loss, step=epoch)

        # Validation
        model.eval()
        map_score = compute_map(model, val_loader)
        mlflow.log_metric("val_mAP", map_score, step=epoch)
        mlflow.log_metric("val_mAP_50", map_score, step=epoch)

        if map_score > best_map:
            best_map = map_score
            # Сохраняем лучшую модель
            mlflow.pytorch.log_model(model, "best_model",
                registered_model_name="people-counter")

    mlflow.log_metric("best_val_mAP", best_map)

    # Логируем артефакты
    mlflow.log_artifact("configs/train_config.yaml")
    mlflow.log_artifact("data/class_mapping.json")

Шаг 2: Model Serving с FastAPI + ONNX Runtime

PyTorch в production — тяжёлый и медленный. Мы конвертируем модели в ONNX и раздаём через FastAPI:

# serving/app.py — production model serving
import io
import numpy as np
from fastapi import FastAPI, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import onnxruntime as ort
from PIL import Image
from prometheus_client import Counter, Histogram, generate_latest
from pydantic import BaseModel
import mlflow
import time

app = FastAPI(title="RetailVision People Counter API", version="2.3.0")

# Метрики
INFERENCE_LATENCY = Histogram(
    'model_inference_seconds', 'Inference latency',
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
REQUESTS_TOTAL = Counter('model_requests_total', 'Total inference requests',
                         ['model_name', 'status'])
PREDICTIONS_TOTAL = Counter('model_predictions_total', 'Detections count')

class ModelManager:
    """Управление загрузкой и версионированием моделей."""

    def __init__(self):
        self.sessions: dict[str, ort.InferenceSession] = {}
        self.active_model: str = None

    def load_model(self, model_name: str, version: str):
        """Загружаем модель из MLflow Model Registry."""
        model_uri = f"models:/{model_name}/{version}"
        local_path = mlflow.artifacts.download_artifacts(model_uri)
        onnx_path = f"{local_path}/model.onnx"

        # Настраиваем ONNX Runtime с GPU
        providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
        sess_options = ort.SessionOptions()
        sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        sess_options.intra_op_num_threads = 4
        sess_options.inter_op_num_threads = 4

        session = ort.InferenceSession(onnx_path, sess_options, providers=providers)
        model_key = f"{model_name}:{version}"
        self.sessions[model_key] = session
        self.active_model = model_key
        return model_key

    def predict(self, image_array: np.ndarray, model_key: str = None) -> dict:
        key = model_key or self.active_model
        session = self.sessions[key]

        with INFERENCE_LATENCY.time():
            outputs = session.run(None, {"input": image_array})

        boxes, scores, labels = outputs
        # Фильтруем по confidence threshold
        mask = scores[0] > 0.5
        return {
            "boxes": boxes[0][mask].tolist(),
            "scores": scores[0][mask].tolist(),
            "count": int(mask.sum()),
        }

manager = ModelManager()

@app.on_event("startup")
async def startup():
    manager.load_model("people-counter", "Production")

class PredictionResponse(BaseModel):
    count: int
    detections: list[dict]
    model_version: str
    inference_ms: float

@app.post("/api/v1/detect", response_model=PredictionResponse)
async def detect_people(file: UploadFile):
    if not file.content_type.startswith("image/"):
        raise HTTPException(400, "File must be an image")

    start = time.monotonic()

    image = Image.open(io.BytesIO(await file.read())).convert("RGB")
    image = image.resize((640, 640))
    img_array = np.array(image).astype(np.float32) / 255.0
    img_array = np.transpose(img_array, (2, 0, 1))
    img_array = np.expand_dims(img_array, axis=0)

    result = manager.predict(img_array)

    elapsed_ms = (time.monotonic() - start) * 1000
    REQUESTS_TOTAL.labels(model_name="people-counter", status="success").inc()
    PREDICTIONS_TOTAL.inc(result["count"])

    return PredictionResponse(
        count=result["count"],
        detections=[
            {"box": box, "confidence": score}
            for box, score in zip(result["boxes"], result["scores"])
        ],
        model_version=manager.active_model,
        inference_ms=round(elapsed_ms, 2),
    )

@app.get("/metrics")
async def metrics():
    from starlette.responses import Response
    return Response(generate_latest(), media_type="text/plain")

Шаг 3: A/B-тестирование моделей

Новая модель может быть лучше по метрикам на тестовом датасете, но хуже на реальных данных. Мы реализовали A/B-тестирование на уровне inference:

# serving/ab_router.py — A/B роутинг между моделями
import hashlib
import random

class ABRouter:
    def __init__(self):
        self.experiments = {}

    def create_experiment(self, name: str, models: dict[str, float]):
        """
        models = {"people-counter:3": 0.9, "people-counter:4": 0.1}
        90% трафика на v3, 10% на v4
        """
        self.experiments[name] = models

    def route(self, experiment: str, user_id: str) -> str:
        """Детерминированный роутинг по user_id."""
        models = self.experiments[experiment]
        # Хэш user_id для стабильного распределения
        hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        bucket = (hash_val % 1000) / 1000.0

        cumulative = 0.0
        for model_key, weight in models.items():
            cumulative += weight
            if bucket < cumulative:
                return model_key

        return list(models.keys())[-1]

Шаг 4: Feature Store

Для моделей, которые используют табличные фичи (предсказание загрузки магазина, оптимизация выкладки), мы спроектировали feature store на Redis + PostgreSQL:

# feature_store/store.py
import redis
import psycopg2
from datetime import datetime, timedelta

class FeatureStore:
    def __init__(self, redis_url: str, pg_dsn: str):
        self.redis = redis.Redis.from_url(redis_url)
        self.pg = psycopg2.connect(pg_dsn)

    def set_features(self, entity_type: str, entity_id: str,
                     features: dict, ttl: int = 3600):
        """Запись фичей: real-time в Redis, исторических в PostgreSQL."""
        key = f"features:{entity_type}:{entity_id}"

        # Real-time фичи в Redis (для online inference)
        self.redis.hset(key, mapping={
            k: str(v) for k, v in features.items()
        })
        self.redis.expire(key, ttl)

        # Исторические фичи в PostgreSQL (для обучения)
        with self.pg.cursor() as cur:
            cur.execute("""
                INSERT INTO feature_log (entity_type, entity_id, features, created_at)
                VALUES (%s, %s, %s, NOW())
            """, (entity_type, entity_id, json.dumps(features)))
        self.pg.commit()

    def get_features(self, entity_type: str, entity_id: str,
                     feature_names: list[str]) -> dict:
        """Чтение фичей для inference — сначала Redis, потом PG."""
        key = f"features:{entity_type}:{entity_id}"
        values = self.redis.hmget(key, feature_names)

        result = {}
        missing = []
        for name, value in zip(feature_names, values):
            if value is not None:
                result[name] = float(value)
            else:
                missing.append(name)

        # Если в Redis нет — берём последние из PG
        if missing:
            with self.pg.cursor() as cur:
                cur.execute("""
                    SELECT features FROM feature_log
                    WHERE entity_type = %s AND entity_id = %s
                    ORDER BY created_at DESC LIMIT 1
                """, (entity_type, entity_id))
                row = cur.fetchone()
                if row:
                    pg_features = json.loads(row[0])
                    for name in missing:
                        if name in pg_features:
                            result[name] = pg_features[name]

        return result

Шаг 5: Мониторинг дрифта

Модель деградирует со временем. Мы отслеживаем два типа дрифта:

  • Data drift — изменение распределения входных данных (новые ракурсы камер, сезонные изменения одежды)
  • Concept drift — изменение связи между фичами и целевой переменной
# monitoring/drift_detector.py
import numpy as np
from scipy import stats
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

class DriftDetector:
    def __init__(self, reference_data, significance_level=0.05):
        self.reference = reference_data
        self.alpha = significance_level

    def detect_data_drift(self, current_data) -> dict:
        """Kolmogorov-Smirnov тест для числовых фичей."""
        results = {}
        for column in self.reference.columns:
            if self.reference[column].dtype in ['float64', 'int64']:
                stat, p_value = stats.ks_2samp(
                    self.reference[column].dropna(),
                    current_data[column].dropna()
                )
                results[column] = {
                    'statistic': stat,
                    'p_value': p_value,
                    'is_drifted': p_value < self.alpha,
                }

        drifted_count = sum(1 for r in results.values() if r['is_drifted'])
        results['summary'] = {
            'total_features': len(results) - 1,
            'drifted_features': drifted_count,
            'drift_share': drifted_count / (len(results) - 1),
            'alert': drifted_count / (len(results) - 1) > 0.3,
        }
        return results

    def detect_prediction_drift(self, reference_preds, current_preds) -> dict:
        """PSI (Population Stability Index) для предсказаний."""
        def calculate_psi(expected, actual, bins=10):
            breakpoints = np.linspace(0, 1, bins + 1)
            expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
            actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)

            # Избегаем деления на ноль
            expected_counts = np.clip(expected_counts, 0.001, None)
            actual_counts = np.clip(actual_counts, 0.001, None)

            psi = np.sum(
                (actual_counts - expected_counts) *
                np.log(actual_counts / expected_counts)
            )
            return psi

        psi = calculate_psi(reference_preds, current_preds)
        return {
            'psi': psi,
            'interpretation': (
                'no drift' if psi < 0.1
                else 'moderate drift' if psi < 0.2
                else 'significant drift'
            ),
            'alert': psi >= 0.2,
        }

Шаг 6: Автоматический ретрейнинг

Когда дрифт-детектор поднимает алерт, запускается пайплайн ретрейнинга:

# pipelines/retrain.yaml — Airflow DAG конфигурация
retrain_pipeline:
  schedule: "0 2 * * 0"  # Каждое воскресенье в 2:00
  trigger_on_drift: true  # Также запуск по алерту дрифта

  steps:
    - name: collect_data
      script: pipelines/collect_training_data.py
      params:
        days_back: 30
        min_samples: 10000

    - name: validate_data
      script: pipelines/validate_data.py
      params:
        min_quality_score: 0.95

    - name: train_model
      script: pipelines/train.py
      resources:
        gpu: 1
        memory: 32Gi
      params:
        epochs: 50
        early_stopping_patience: 5

    - name: evaluate
      script: pipelines/evaluate.py
      params:
        min_map: 0.85  # Минимальный mAP для прохождения
        comparison: "Production"  # Сравнение с текущей production-моделью

    - name: shadow_deploy
      script: pipelines/shadow_deploy.py
      params:
        duration_hours: 24
        traffic_share: 0.1

    - name: promote_or_rollback
      script: pipelines/promote.py
      params:
        min_improvement: 0.02  # Минимум +2% к mAP для промоушена

GPU-оптимизация и батчинг

Одиночный inference на GPU неэффективен — GPU простаивает. Мы реализовали dynamic batching:

# serving/batcher.py — динамический батчинг запросов
import asyncio
import numpy as np
from collections import deque
import time

class DynamicBatcher:
    def __init__(self, model_manager, max_batch_size=16,
                 max_wait_ms=50):
        self.model = model_manager
        self.max_batch = max_batch_size
        self.max_wait = max_wait_ms / 1000.0
        self.queue: deque = deque()
        self.lock = asyncio.Lock()

    async def predict(self, image_array: np.ndarray) -> dict:
        """Добавляет запрос в очередь и ждёт результат."""
        future = asyncio.get_event_loop().create_future()
        async with self.lock:
            self.queue.append((image_array, future))
            if len(self.queue) >= self.max_batch:
                await self._process_batch()

        # Ждём результат (или timeout запустит batch)
        if not future.done():
            asyncio.get_event_loop().call_later(
                self.max_wait, lambda: asyncio.ensure_future(self._flush())
            )

        return await future

    async def _flush(self):
        async with self.lock:
            if self.queue:
                await self._process_batch()

    async def _process_batch(self):
        batch_items = []
        while self.queue and len(batch_items) < self.max_batch:
            batch_items.append(self.queue.popleft())

        if not batch_items:
            return

        # Собираем батч
        images = np.concatenate([item[0] for item in batch_items], axis=0)
        results = self.model.predict_batch(images)

        # Раздаём результаты
        for i, (_, future) in enumerate(batch_items):
            if not future.done():
                future.set_result(results[i])

Результаты

МетрикаДоПосле
Время деплоя модели3 месяца2 дня
Inference latency (p99)2800 мс85 мс
Throughput12 rps450 rps
mAP на production данных0.78 (деградация)0.91 (стабильно)
Воспроизводимость экспериментов~30%100%
Обнаружение дрифтаВручную, post-factumАвтоматически, <1 час
Стоимость GPU-инфраструктуры$4200/мес$1800/мес (-57%)

ML в production — это 20% модели и 80% инфраструктуры. Без трекинга экспериментов, мониторинга дрифта и автоматического ретрейнинга даже лучшая модель деградирует за пару месяцев. Инвестируйте в MLOps — это окупается.

Нужна помощь с проектом?

Специалисты АйТи Фреш помогут с архитектурой, DevOps, безопасностью и разработкой — 15+ лет опыта

📞 Связаться с нами
#abтестирование#concept drift#data drift#devops#fastapi#feature#gpuоптимизация#mlflow
Комментарии 0

Оставить комментарий

загрузка...