Some scenarios don’t require immediate message delivery. For instance, a setup process that needs to be completed before messages are delivered, or an order that needs a specific amount of time before processing.
RabbitMQ’s delayed exchange plugin addresses this requirement without needing extra code. Messages are delivered after a predetermined amount of time. This tutorial explores scenarios that require delayed messaging, the benefits of implementing a delay, and how to create a delayed exchange.
What is a Delayed Message Exchange?
When processing messages, instantaneous delivery isn’t always the desired or best option. Providing a delay in processing a message is ideal for scenarios with hard waiting periods such as setup processes, or when trying to ensure that customers have a chance to read text messages.
The RabbitMQ delayed exchange plugin is used to implement a wait time between when a message reaches the exchange and when it is delivered to a queue. Every time a message is published, an offset in milliseconds can be specified.
Types of Delayed Exchanges
While the delayed exchange plugin provides its own, it also mimics standard exchange types. This means that messages are routed based on a designated type after the delay period specified. Exchange types including direct, fanout, or custom can be used to create complex topologies without needing multiple layers of extra code.
No messages are lost, as acknowledgment and other features are triggered once they reach the destination queue.
Creating a Delayed Message Exchange
CloudAMQP customers can simply enable the delayed message exchange plugin via the CloudAMQP control panel plugin tab.
A third-party plugin is required and can be downloaded here. Once downloaded, extract the plugin and add it to the RabbitMQ plugins directory.
Enable the plugin:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Declare the exchange by using a special parameter known as
x-delay-type
and the
x-delayed-message
type:
channel.exchange_declare(exchange='test-exchange',
exchange_type='x-delayed-message',
arguments={"x-delayed-type": "fanout"})
In the example, the exchange used is a fanout type, meaning that messages reach each attached destination.
Use
x-delay
to add a delay to each message:
channel.basic_publish(exchange='test-exchange',
routing_key='test_route',
properties=pika.BasicProperties(
headers={'x-delay': 2500}
),
body='Hello World!')
Checking if a message was delayed
Ensuring that messages process after a given delay is possible. The plugin
returns the value in
x-delay.
The value of
x-delay becomes
-4000
if the delayed exchange is instructed to wait for 4000 milliseconds, and the
header does not drop.
Limitations
If there is only a single copy of a scheduled message, it could be lost if a node goes down or if the plugin attached to it is disabled. When a node goes down, all scheduled messages are lost.
All messages are stored both in RAM and on disk. Persisting every message on disk may create performance issues if used extensively. On the other hand storing all messages in RAM (unlike lazy queues) may exhaust memory resources.
Load balancing for the delayed message exchange should be used to address high availability and scalability. Attach the exchange to others using an exchange to exchange binding or consider delaying messages using the dead letter exchange.
Avoid the Pitfalls of the Delayed Message Exchange
The use of delayed message exchange turned out to be quite useful, but not without its quirks. The article How to Avoid the Pitfalls of the Delayed Message Exchange will help you understand how delayed message exchange works, so you can avoid potential pitfalls.