Самая сложная часть. 5 ТБ PostgreSQL нельзя мигрировать через pg_dump/pg_restore — это займёт часы, а значит, будет даунтайм. Мы использовали Change Data Capture через Debezium.
Схема работы:
- Начальный снэпшот: pg_basebackup на реплику, восстановление в RDS через рестор
- Запуск Debezium Connector для захвата изменений из WAL
- Потоковая репликация изменений в RDS через Kafka
- Верификация консистентности данных
- Переключение приложения на RDS (DNS cutover)
// debezium-connector-config.json
{
"name": "autodealerpro-pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg-master.dc1.internal",
"database.port": "5432",
"database.user": "debezium_replicator",
"database.dbname": "autodealerpro",
"database.server.name": "adp_prod",
"plugin.name": "pgoutput",
"slot.name": "debezium_migration",
"publication.name": "dbz_publication",
"table.include.list": "public.*",
"snapshot.mode": "initial",
"tombstones.on.delete": false,
"decimal.handling.mode": "string",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "adp_cdc_$3"
}
}
На стороне RDS мы написали consumer, который применял изменения:
# cdc_consumer.py — применение CDC-событий к целевой БД
import json
from kafka import KafkaConsumer
import psycopg2
consumer = KafkaConsumer(
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
group_id='cdc-migration-consumer',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
# Подписка на все CDC-топики
consumer.subscribe(pattern='adp_cdc_.*')
conn = psycopg2.connect(
host='adp-prod.cluster-xyz.eu-central-1.rds.amazonaws.com',
dbname='autodealerpro',
user='migration_user',
)
applied = 0
for message in consumer:
payload = message.value.get('payload', {})
op = payload.get('op') # c=create, u=update, d=delete
table = message.topic.replace('adp_cdc_', '')
with conn.cursor() as cur:
if op == 'c':
after = payload['after']
cols = ', '.join(after.keys())
vals = ', '.join(['%s'] * len(after))
cur.execute(
f"INSERT INTO {table} ({cols}) VALUES ({vals}) ON CONFLICT DO NOTHING",
list(after.values())
)
elif op == 'u':
after = payload['after']
pk = list(payload['source'].get('pk', {}).keys())[0] if 'pk' in payload.get('source', {}) else 'id'
sets = ', '.join([f"{k} = %s" for k in after.keys() if k != pk])
cur.execute(
f"UPDATE {table} SET {sets} WHERE {pk} = %s",
[v for k, v in after.items() if k != pk] + [after[pk]]
)
elif op == 'd':
before = payload['before']
pk = 'id'
cur.execute(f"DELETE FROM {table} WHERE {pk} = %s", [before[pk]])
conn.commit()
applied += 1
if applied % 10000 == 0:
print(f"Applied {applied} changes, lag: {message.timestamp}")
Мы гнали CDC-поток 3 недели, пока lag не стабилизировался на уровне менее 100 мс. После этого можно было переключаться.
Оставить комментарий