Sometimes you want to delay the delivery of messages for a certain time so that subscribers doesn't see them immediately. The AMQP protocol doesn't have a native delayed queue feature, but with RabbitMQ's AMQP protocol extensions we can easily emulate one by combining the message TTL function and the dead-lettering function.
Note: There is now a plugin available for delayed messages (RabbitMQ 3.5.3 and later versions), RabbitMQ delayed messages exchange plugin. The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the user adds a delay header to a message. Read more about the plugin: RabbitMQ delayed message exchange plugin
If a queue is declared with the x-dead-letter-exchange property messages which is either rejected, nacked or the TTL for a message expires will be sent to the specified dead-letter-exchange, and if you specify x-dead-letter-routing-key the routing key of the message will be changed when dead lettered.
By declaring a queue with the x-message-ttl property, messages will be discarded from the queue if they haven't been consumed within the time specified.
By combining these to functions we publish a message to a queue which will expire its message after the TTL and then reroute it to the exchange and with the dead-letter routing key so that they end up in a queue which we consume from.
Step by step:
The following Ruby snippet, which relays on the excellent Bunny library, demonstrates how delayed message can be implemented.
require 'bunny'
B = Bunny.new ENV['CLOUDAMQP_URL']
B.start
DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'
def publish
ch = B.create_channel
# declare a queue with the DELAYED_QUEUE name
ch.queue(DELAYED_QUEUE, arguments: {
# set the dead-letter exchange to the default queue
'x-dead-letter-exchange' => '',
# when the message expires, set change the routing key into the destination queue name
'x-dead-letter-routing-key' => DESTINATION_QUEUE,
# the time in milliseconds to keep the message in the queue
'x-message-ttl' => 3000
})
# publish to the default exchange with the the delayed queue name as routing key,
# so that the message ends up in the newly declared delayed queue
ch.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
puts "#{Time.now}: Published the message"
ch.close
end
def subscribe
ch = B.create_channel
# declare the destination queue
q = ch.queue DESTINATION_QUEUE, durable: true
q.subscribe do |delivery, headers, body|
puts "#{Time.now}: Got the message"
end
end
subscribe()
publish()
sleep
If you have any questions, please feel free to contact support@cloudamqp.com for further assistance.
The delayed message exchange has the same purpose, and inherently works the same for LavinMQ and RabbitMQ. You can set up a delayed message exchange in RabbitMQ, and then export+import those definitions to LavinMQ and expect that it works exactly the same for your application.
With that said, the delayed-exchange is implemented slightly different.
For example, the UI is different. Additionally, in LavinMQ, you can let the
type be the regular type (topic, direct, fanout..) and then you add an
args: {x-delayed-exchange: true}
to indicate that the exchange is a delayed exchange
LavinMQ also handles the “rabbitmq way” where you send in
type: x-delayed-message
so clients can work with both brokers in the same way, but the user interface is a
bit different. If you don’t specify a type it will become a direct exchange by default.
In the management interface for LavinMQ, you choose a type and then check the “delayed” box if you want the exchange to be delayed, but in the rabbitMQ interface, delayed is displayed as one of the types.