В асинхронной системе сообщение может не обработаться: внешний API недоступен, данные некорректны, consumer упал. Без правильной стратегии повторов сообщения теряются или зацикливаются.
Мы реализовали трёхуровневую стратегию:
# RabbitMQ: настройка очередей с DLQ
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()
# Dead Letter Exchange
channel.exchange_declare(
exchange='dlx.orders', exchange_type='direct'
)
# Основная очередь с DLQ
channel.queue_declare(
queue='orders.process',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx.orders',
'x-dead-letter-routing-key': 'orders.failed',
'x-message-ttl': 300000, # 5 минут максимум в очереди
'x-max-length': 100000,
}
)
# Очередь для повторных попыток с задержкой
for delay in [10, 60, 300]: # 10с, 1м, 5м
channel.queue_declare(
queue=f'orders.retry.{delay}s',
durable=True,
arguments={
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'orders.process',
'x-message-ttl': delay * 1000,
}
)
# Финальная DLQ для ручного разбора
channel.queue_declare(
queue='orders.dead', durable=True
)
Логика повторов в consumer:
def on_message(channel, method, properties, body):
retry_count = (properties.headers or {}).get('x-retry-count', 0)
try:
process_order(json.loads(body))
channel.basic_ack(delivery_tag=method.delivery_tag)
except TemporaryError as e:
# Повторяемая ошибка — отправляем в retry queue
if retry_count < 3:
delays = [10, 60, 300]
channel.basic_publish(
exchange='',
routing_key=f'orders.retry.{delays[retry_count]}s',
body=body,
properties=pika.BasicProperties(
headers={'x-retry-count': retry_count + 1},
delivery_mode=2, # persistent
)
)
else:
# Исчерпали попытки — в DLQ
channel.basic_publish(
exchange='dlx.orders',
routing_key='orders.failed',
body=body
)
channel.basic_ack(delivery_tag=method.delivery_tag)
except PermanentError:
# Невосстановимая ошибка — сразу в DLQ
channel.basic_nack(
delivery_tag=method.delivery_tag, requeue=False
)
Оставить комментарий