TimescaleDB для IoT: как мы построили систему мониторинга 10 000 датчиков умного здания

Ситуация: 10 000 датчиков и Excel-отчёты

«УмныйДом» — компания, управляющая smart building инфраструктурой: температура, влажность, CO2, энергопотребление, занятость помещений. 10 000 датчиков в 45 зданиях, каждый датчик отправляет показания раз в 10 секунд.

Математика: 10 000 датчиков × 6 показаний/минуту = 60 000 записей/минуту = 1 000 записей/секунду. За сутки — 86.4 миллиона записей. За год — 31.5 миллиардов.

Текущее решение — PostgreSQL 15 с обычными таблицами:

  • Размер базы — 890 GB за 14 месяцев. Диск на 1 TB заканчивается.
  • Запросы тормозят — агрегация за месяц по одному зданию: 45 секунд. За год — timeout.
  • Нет retention — данные старше 2 лет не нужны, но удалить их из обычной таблицы — часовая операция с блокировками.
  • Отчёты в Excel — менеджеры выгружают данные в CSV, строят графики вручную. Grafana пробовали, но запросы слишком медленные.

Команда «УмныйДом» обратилась к нам в itfresh.ru. Мы предложили TimescaleDB — расширение PostgreSQL, специально разработанное для time-series данных.

Почему TimescaleDB, а не InfluxDB или Prometheus

Мы рассмотрели три варианта:

КритерийTimescaleDBInfluxDBPrometheus
SQL-совместимостьПолный SQL (это PostgreSQL)InfluxQL / FluxPromQL
JOIN с реляционными даннымиДа (обычные таблицы рядом)НетНет
CompressionДо 95% (колоночное сжатие)ХорошееСреднее
Retention policiesВстроенныеВстроенныеОграниченные
Long-term storageОтлично (годы данных)ХорошоПлохо (метрики, не данные)
EcosystemВсе PostgreSQL инструментыСвоя экосистемаСвоя экосистема
Стоимость переходаМинимальная (уже PostgreSQL)Высокая (новый стек)Высокая

TimescaleDB победил по трём причинам:

  1. SQL — команда «УмныйДом» знает SQL, не нужно учить новый язык запросов
  2. PostgreSQL-совместимость — датчики, здания, арендаторы хранятся в обычных таблицах, JOINы работают
  3. Миграция — расширение к существующему PostgreSQL, не нужно менять инфраструктуру

Prometheus отпал сразу — он для метрик мониторинга, а не для бизнес-данных IoT. InfluxDB хорош, но потребовал бы полной смены стека и переобучения команды.

Установка и создание hypertable

Hypertable — ключевая абстракция TimescaleDB. Это обычная PostgreSQL-таблица, которая автоматически партицируется по времени (и опционально по другим колонкам). Для приложения она выглядит как одна таблица.

# Установка TimescaleDB
sudo apt install timescaledb-2-postgresql-16

# Запускаем тюнер конфигурации
sudo timescaledb-tune --pg-config=/usr/bin/pg_config

# Включаем расширение
sudo -u postgres psql -d smartbuilding
CREATE EXTENSION IF NOT EXISTS timescaledb;

# Создаём таблицу для показаний датчиков
CREATE TABLE sensor_readings (
    time         TIMESTAMPTZ NOT NULL,
    sensor_id    INTEGER     NOT NULL,
    building_id  INTEGER     NOT NULL,
    metric_type  TEXT        NOT NULL,  -- 'temperature', 'humidity', 'co2', 'power'
    value        DOUBLE PRECISION NOT NULL,
    quality      SMALLINT    DEFAULT 100  -- качество показания (0-100)
);

# Превращаем в hypertable — партиции по 1 дню
SELECT create_hypertable(
    'sensor_readings',
    'time',
    chunk_time_interval => INTERVAL '1 day',
    create_default_indexes => TRUE
);

# Дополнительные индексы для типичных запросов
CREATE INDEX idx_sensor_readings_sensor_time
    ON sensor_readings (sensor_id, time DESC);
CREATE INDEX idx_sensor_readings_building_metric
    ON sensor_readings (building_id, metric_type, time DESC);

Референсные таблицы для JOINов — обычные PostgreSQL-таблицы:

CREATE TABLE sensors (
    sensor_id    SERIAL PRIMARY KEY,
    building_id  INTEGER REFERENCES buildings(building_id),
    floor        INTEGER,
    room         TEXT,
    sensor_type  TEXT,
    installed_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE buildings (
    building_id  SERIAL PRIMARY KEY,
    name         TEXT NOT NULL,
    address      TEXT,
    city         TEXT,
    timezone     TEXT DEFAULT 'Europe/Moscow'
);

-- Можем делать JOINы — это обычный PostgreSQL!
SELECT
    b.name AS building,
    s.room,
    time_bucket('1 hour', sr.time) AS hour,
    AVG(sr.value) AS avg_temp
FROM sensor_readings sr
JOIN sensors s ON sr.sensor_id = s.sensor_id
JOIN buildings b ON s.building_id = b.building_id
WHERE sr.metric_type = 'temperature'
    AND sr.time > NOW() - INTERVAL '24 hours'
    AND b.city = 'Москва'
GROUP BY b.name, s.room, hour
ORDER BY hour DESC;

Оптимизация вставки: batch insert и COPY

1000 записей/секунду — не проблема для PostgreSQL. Но важно вставлять эффективно, чтобы оставить запас для аналитических запросов.

# Худший вариант: построчный INSERT (не делайте так!)
# 1000 отдельных INSERT/sec = 1000 транзакций/sec = высокий overhead

# Хороший вариант: batch INSERT
INSERT INTO sensor_readings (time, sensor_id, building_id, metric_type, value)
VALUES
    ('2026-04-05T10:00:00Z', 1, 1, 'temperature', 22.5),
    ('2026-04-05T10:00:00Z', 1, 1, 'humidity', 45.2),
    ('2026-04-05T10:00:00Z', 2, 1, 'temperature', 23.1),
    -- ... 1000 строк за раз
;

# Лучший вариант: COPY (в 5-10 раз быстрее INSERT)
COPY sensor_readings (time, sensor_id, building_id, metric_type, value)
FROM STDIN WITH (FORMAT csv);
2026-04-05T10:00:00Z,1,1,temperature,22.5
2026-04-05T10:00:00Z,1,1,humidity,45.2
\.

Python-скрипт для batch ingestion:

import psycopg2
from psycopg2.extras import execute_values
import io

def insert_batch_values(conn, readings):
    """Batch INSERT через execute_values — хороший вариант."""
    with conn.cursor() as cur:
        execute_values(
            cur,
            """INSERT INTO sensor_readings
               (time, sensor_id, building_id, metric_type, value)
               VALUES %s""",
            readings,
            page_size=1000
        )
    conn.commit()

def insert_copy(conn, readings):
    """COPY через StringIO — лучший вариант."""
    buffer = io.StringIO()
    for r in readings:
        buffer.write(f"{r[0]}\t{r[1]}\t{r[2]}\t{r[3]}\t{r[4]}\n")
    buffer.seek(0)

    with conn.cursor() as cur:
        cur.copy_from(
            buffer,
            'sensor_readings',
            columns=('time', 'sensor_id', 'building_id', 'metric_type', 'value'),
            sep='\t'
        )
    conn.commit()

# Benchmark на 100 000 записей:
# Построчный INSERT: 45 секунд
# Batch INSERT (execute_values): 3.2 секунды
# COPY: 0.8 секунды

Мы настроили collector-сервис, который буферизует показания от датчиков и раз в 10 секунд вставляет batch из 10 000 записей через COPY. Нагрузка на PostgreSQL — минимальная.

Continuous aggregates: мгновенные отчёты

Главная боль «УмныйДом» — тяжёлые аналитические запросы. Агрегация 86 миллионов записей за сутки для одного здания — 45 секунд. За месяц — timeout.

Continuous aggregates — материализованные представления, которые TimescaleDB автоматически обновляет при поступлении новых данных:

-- Часовые агрегаты
CREATE MATERIALIZED VIEW sensor_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    building_id,
    metric_type,
    AVG(value) AS avg_value,
    MIN(value) AS min_value,
    MAX(value) AS max_value,
    COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id, building_id, metric_type
WITH NO DATA;  -- не заполняем историю сразу

-- Настраиваем автоматическое обновление
SELECT add_continuous_aggregate_policy('sensor_hourly',
    start_offset    => INTERVAL '3 hours',  -- обновляем данные за последние 3 часа
    end_offset      => INTERVAL '1 hour',   -- не трогаем последний час (данные ещё поступают)
    schedule_interval => INTERVAL '30 minutes'  -- запускаем каждые 30 минут
);

-- Дневные агрегаты (поверх часовых — каскадные!)
CREATE MATERIALIZED VIEW sensor_daily
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 day', bucket) AS bucket,
    sensor_id,
    building_id,
    metric_type,
    AVG(avg_value) AS avg_value,
    MIN(min_value) AS min_value,
    MAX(max_value) AS max_value,
    SUM(sample_count) AS sample_count
FROM sensor_hourly
GROUP BY time_bucket('1 day', bucket), sensor_id, building_id, metric_type
WITH NO DATA;

SELECT add_continuous_aggregate_policy('sensor_daily',
    start_offset    => INTERVAL '3 days',
    end_offset      => INTERVAL '1 day',
    schedule_interval => INTERVAL '1 hour'
);

-- Заполняем историю
CALL refresh_continuous_aggregate('sensor_hourly', '2025-01-01', NOW());
CALL refresh_continuous_aggregate('sensor_daily', '2025-01-01', NOW());

Теперь запрос за месяц работает мгновенно — он читает 720 предагрегированных записей вместо 2.6 миллиардов сырых:

-- Средняя температура по зданию за месяц — 5 мс вместо 45 секунд!
SELECT
    bucket,
    AVG(avg_value) AS avg_temp
FROM sensor_hourly
WHERE building_id = 1
    AND metric_type = 'temperature'
    AND bucket >= NOW() - INTERVAL '30 days'
GROUP BY bucket
ORDER BY bucket;

-- Годовой отчёт — через дневные агрегаты, 8 мс
SELECT
    bucket,
    avg_value,
    min_value,
    max_value
FROM sensor_daily
WHERE building_id = 1
    AND metric_type = 'power'
    AND bucket >= NOW() - INTERVAL '1 year'
ORDER BY bucket;

Compression и retention: управление объёмом данных

890 GB за 14 месяцев — и это без сжатия. TimescaleDB поддерживает native columnar compression:

-- Включаем compression для hypertable
ALTER TABLE sensor_readings SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'sensor_id, building_id, metric_type',
    timescaledb.compress_orderby = 'time DESC'
);

-- Автоматическая compression для данных старше 7 дней
SELECT add_compression_policy(
    'sensor_readings',
    compress_after => INTERVAL '7 days'
);

-- Сжимаем историю вручную (для первоначального сжатия)
SELECT compress_chunk(c)
FROM show_chunks('sensor_readings', older_than => INTERVAL '7 days') c;

-- Проверяем результат
SELECT
    chunk_name,
    pg_size_pretty(before_compression_total_bytes) AS before,
    pg_size_pretty(after_compression_total_bytes) AS after,
    ROUND((1 - after_compression_total_bytes::numeric /
           before_compression_total_bytes) * 100, 1) AS compression_pct
FROM chunk_compression_stats('sensor_readings')
ORDER BY chunk_name DESC
LIMIT 5;

-- Результат:
-- +---------------------------+--------+--------+------------------+
-- | chunk_name                | before | after  | compression_pct  |
-- +---------------------------+--------+--------+------------------+
-- | _hyper_1_420_chunk        | 2.1 GB | 142 MB | 93.2%            |
-- | _hyper_1_419_chunk        | 2.1 GB | 138 MB | 93.4%            |
-- | _hyper_1_418_chunk        | 2.1 GB | 145 MB | 93.1%            |
-- +---------------------------+--------+--------+------------------+

93% compression! 890 GB превратились в 62 GB. Это типично для time-series данных с высокой повторяемостью sensor_id и metric_type.

Retention policy — автоматическое удаление старых данных:

-- Сырые данные храним 6 месяцев
SELECT add_retention_policy(
    'sensor_readings',
    drop_after => INTERVAL '6 months'
);

-- Часовые агрегаты — 2 года
SELECT add_retention_policy(
    'sensor_hourly',
    drop_after => INTERVAL '2 years'
);

-- Дневные агрегаты — 10 лет (практически бесконечно, мало данных)
SELECT add_retention_policy(
    'sensor_daily',
    drop_after => INTERVAL '10 years'
);

-- Проверяем расписание всех политик
SELECT * FROM timescaledb_information.jobs
WHERE proc_name IN ('policy_retention', 'policy_compression', 'policy_refresh_continuous_aggregate');

-- Tiered storage: 3 уровня
-- Последние 7 дней: несжатые, быстрый доступ для real-time дашбордов
-- 7 дней - 6 месяцев: сжатые, доступ через SQL, медленнее
-- Старше 6 месяцев: удалены, данные только в continuous aggregates

Grafana integration и capacity planning

TimescaleDB работает с Grafana через стандартный PostgreSQL data source — не нужен отдельный плагин:

-- Grafana query: температура в реальном времени (для дашборда)
SELECT
    $__timeGroup(time, $__interval) AS time,
    sensor_id::text AS metric,
    AVG(value) AS value
FROM sensor_readings
WHERE $__timeFilter(time)
    AND building_id = $building_id
    AND metric_type = 'temperature'
GROUP BY 1, 2
ORDER BY 1;

-- Grafana query: сравнение зданий за период
SELECT
    bucket AS time,
    b.name AS metric,
    sh.avg_value AS value
FROM sensor_hourly sh
JOIN buildings b ON sh.building_id = b.building_id
WHERE sh.metric_type = 'power'
    AND sh.bucket BETWEEN $__timeFrom() AND $__timeTo()
ORDER BY 1;

-- Grafana variable: список зданий
SELECT building_id AS __value, name AS __text
FROM buildings ORDER BY name;

Downsampling — показываем разный уровень детализации в зависимости от временного диапазона:

-- В Grafana используем переменные для автоматического downsampling
-- Последние 24 часа → сырые данные (10-секундное разрешение)
-- 1-7 дней → sensor_hourly (часовое разрешение)
-- 7+ дней → sensor_daily (дневное разрешение)

-- Реализация через Grafana query с условием:
SELECT
    time_bucket(
        CASE
            WHEN $__timeTo() - $__timeFrom() < INTERVAL '2 days' THEN INTERVAL '1 minute'
            WHEN $__timeTo() - $__timeFrom() < INTERVAL '7 days' THEN INTERVAL '1 hour'
            ELSE INTERVAL '1 day'
        END,
        time
    ) AS time,
    AVG(value) AS value
FROM sensor_readings
WHERE $__timeFilter(time)
    AND sensor_id = $sensor_id
GROUP BY 1
ORDER BY 1;

Capacity planning:

-- Текущий размер данных
SELECT
    hypertable_name,
    pg_size_pretty(hypertable_size(format('%I.%I', hypertable_schema, hypertable_name))) AS total_size,
    pg_size_pretty(total_bytes - index_bytes - toast_bytes) AS table_size,
    pg_size_pretty(index_bytes) AS index_size,
    total_bytes
FROM timescaledb_information.hypertables h,
     hypertable_detailed_size(format('%I.%I', h.hypertable_schema, h.hypertable_name))
ORDER BY total_bytes DESC;

-- Прогноз: сколько места нужно на год
-- Сырые данные: 86.4M записей/день × ~50 байт = 4.3 GB/день (несжатые)
-- Со сжатием 93%: 300 MB/день
-- Retention 6 месяцев: 300 MB × 180 = 54 GB максимум
-- Continuous aggregates: ~2 GB
-- Итого: ~60 GB для всех данных — вместо 890 GB

-- Мониторинг chunk statistics
SELECT
    chunk_name,
    range_start,
    range_end,
    is_compressed,
    pg_size_pretty(total_bytes) AS size
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_readings'
ORDER BY range_start DESC
LIMIT 10;

Результаты проекта «УмныйДом»:

МетрикаДо (PostgreSQL)После (TimescaleDB)
Размер данных890 GB62 GB (compression + retention)
Запрос за месяц (одно здание)45 секунд5 мс (continuous aggregates)
Запрос за годTimeout8 мс (daily aggregates)
Insert throughput800 rows/sec (построчно)120 000 rows/sec (COPY batch)
Удаление старых данных1 час с блокировкамиМгновенно (drop chunk)
Отчёты для менеджеровExcel-файлыGrafana real-time дашборды

TimescaleDB — идеальный выбор для IoT, если вы уже используете PostgreSQL. Никакой миграции стека, обычный SQL, JOIN с реляционными данными. Если у вас проект с time-series данными — обращайтесь к нам в itfresh.ru.

Часто задаваемые вопросы

TimescaleDB Community Edition — полностью бесплатный и open-source (Timescale License). Включает hypertables, continuous aggregates, compression, retention policies — всё, что нужно для 95% проектов. Платная версия (Timescale Cloud) добавляет multi-node clustering, Tiered Storage в S3 и managed service. Для on-premise проекта «УмныйДом» Community Edition хватило полностью.
Да, через команду create_hypertable с параметром migrate_data => true. TimescaleDB создаст chunks для существующих данных. Для больших таблиц (100+ GB) миграция может занять несколько часов. Рекомендуем делать на реплике: создаёте hypertable, мигрируете данные через INSERT ... SELECT, затем переключаете приложение.
Для простых вставок InfluxDB быстрее (нативный time-series движок). Для сложных аналитических запросов TimescaleDB быстрее (полноценный SQL, JOINы, window functions). По сжатию — примерно одинаково. Главное преимущество TimescaleDB — это PostgreSQL, поэтому вы получаете транзакции, индексы, расширения (PostGIS для геоданных), все инструменты экосистемы.
Правило: один chunk должен умещаться в 25% доступной памяти. Для 10 000 датчиков с 10-секундным интервалом и 64 GB RAM оптимально — 1 день (chunk ~4 GB несжатый). Для менее интенсивной записи — 1 неделя или 1 месяц. Слишком маленькие chunks создают overhead на планирование, слишком большие — медленные compression и retention.
С версии TimescaleDB 2.7 continuous aggregates поддерживают real-time режим: при запросе к aggregate TimescaleDB автоматически добавляет свежие данные из исходной таблицы, которые ещё не были материализованы. Задержка — 0 секунд для запросов, но refresh policy по-прежнему нужен для материализации (иначе каждый запрос будет агрегировать свежие данные на лету).

Нужна помощь с проектом?

Специалисты АйТи Фреш помогут с архитектурой, DevOps, безопасностью и разработкой — 15+ лет опыта

📞 Связаться с нами
#TimescaleDB#IoT#time-series#hypertable#continuous aggregates#compression#retention policy#Grafana
Комментарии 0

Оставить комментарий

загрузка...