Sometimes you may want to delay the delivery of messages for a certain time so that subscribers doesn't see them immediately. The AMQP protocol doesn't have a native delayed queue feature. People has instead combined the message TTL function and the dead-lettering function to achieve this - guide for that can be found here.
There is also a plugin available for delayed messages, RabbitMQ delayed messages exchange plugin (targets RabbitMQ 3.5.8 and later versions and Erlang 18.0+). This plugin adds a new exchange type to RabbitMQ and messages routed to that exchange can be delayed for a specified amount of time. CloudAMQP customers on dedicated plans can enable the plugin from the plugins tab.
More information and information about how to install the plugin for non-customers can be found here.
x-delayed-message, x-delayed-type
We will start by declare an exchange providing the 'x-delayed-message' exchange type and also provided an 'x-delayed-type' argument. 'x-delayed-type' tell the plugin how the exchange should behave after the given delay time has passed by - in this case we want it to behave as a direct exchange.
pubChannel.assertExchange(exchange, "x-delayed-message", {autoDelete: false, durable: true, passive: true, arguments: {'x-delayed-type': "direct"}})
x-delay
When we have the exchange declared, we can publish messages providing a header telling the plugin how long time it should delay our message. Don't forget to bind a queue to the exchange.
pubChannel.publish(exchange, routingKey, content, {headers: {"x-delay": delay}},
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
pubChannel.connection.close();
}
});
The plugin will proceed to route the message without delay if no x-delay header is provided.
Code example
When running the full code given below, a connection will be established between the RabbiMQ instance and your application. Queues and exchanges will be declared and created if they do not already exist and finally a message will be published with the 'x-delay' header - one message will be sent every 10 second. The publish method will queue messages internally if the connection is down and resend them later. The consumer subscribes to the jobs-queue. The messages are handled one by one and sent to the work process.
[AMQP] connected
Worker is started
work sent: 18:19:35 --- received: 18:19:45
work sent: 18:19:45 --- received: 18:19:55
work sent: 18:19:55 --- received: 18:20:05
work sent: 18:20:05 --- received: 18:20:15
var amqp = require('amqplib/callback_api');
var amqpConn = null;
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 1000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
function whenConnected() {
startPublisher();
startWorker();
}
var pubChannel = null;
var offlinePubQueue = [];
var exchange = 'my-delay-exchange';
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
//assert the exchange: 'my-delay-exchange' to be a x-delayed-message,
pubChannel.assertExchange(exchange, "x-delayed-message", {autoDelete: false, durable: true, passive: true, arguments: {'x-delayed-type': "direct"}})
//Bind the queue: "jobs" to the exchnage: "my-delay-exchange" with the binding key "jobs"
pubChannel.bindQueue('jobs', exchange ,'jobs');
while (true) {
var m = offlinePubQueue.shift();
if (!m) break;
publish(m[0], m[1], m[2]);
}
});
}
function publish(routingKey, content, delay) {
try {
pubChannel.publish(exchange, routingKey, content, {headers: {"x-delay": delay}},
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] failed", e.message);
offlinePubQueue.push([routingKey, content, delay]);
}
}
// A worker that acks messages only if processed succesfully
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
work(msg, function(ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
function work(msg, cb) {
console.log(msg.content.toString() + " --- received: " + current_time());
cb(true);
}
});
}
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
function current_time(){
now = new Date();
hour = "" + now.getHours(); if (hour.length == 1) { hour = "0" + hour; }
minute = "" + now.getMinutes(); if (minute.length == 1) { minute = "0" + minute; }
second = "" + now.getSeconds(); if (second.length == 1) { second = "0" + second; }
return hour + ":" + minute + ":" + second;
}
//Publish a message every 10 second. The message will be delayed 10seconds.
setInterval(function() {
var date = new Date();
publish("jobs", new Buffer("work sent: " + current_time()), 10000);
}, 10000);
start();
As always, please email us at if you have any suggestions or feedback.