Welcome to LavinMQ for beginners! This article is the second in a series of LavinMQ and will take you through code examples with Node.js, steps to set up your own LavinMQ instance, and a relatable example of a working LavinMQ system to follow for yourself. The guide goes on to explain the steps to set up a connection and the basics of publishing/consuming messages from a queue.
This article assumes that you have a running LavinMQ server installed, if not - get started with a free LavinMQ plan at CloudAMQP.
This tutorial follows the scenario used in the previous article, Part 1: LavinMQ for beginners - What is LavinMQ?
The example follows a web application that allows users to upload a profile picture. Once the image is uploaded the user can traditionally decide what part of the image they want to show, scale it up or down and move it around.
The web application takes these instructions and the image and sends a request to the part of the system that is responsible for "Image Processing", which usually includes downsizing and web optimization.
The website handles the information, scales the image, and saves it in the new format. In the example, the entire scaling process will take several seconds. Let’s get started.
Getting started with LavinMQ and Node.js
Start by downloading the client-library for Node.js. Node developers have a number of options for AMQP client libraries. In this example is amqplib used. Start by adding amqplib as a dependency to your package.json file.
Full code can be downloaded from GitHub.
Queues and exchanges will be declared and created if they do not already exist and, finally, a message is published. The publish method queues messages internally if the connection is down and resends them later. Once the consumer subscribes to the queue, the messages are handled one by one and sent to the image processor.
A default exchange, identify by the empty string ("") will be used. The default exchange means that messages are routed to the queue with the name specified by routing_key, if it exists. (The default exchange is a direct exchange with no name)
Tutorial source code
Load amqplib
# Access the callback-based API
var amqp = require('amqplib/callback_api');
var amqpConn = null;
Set up a connection
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 1000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
The
start
function will establish a connection to LavinMQ. If the connection is
closed or fails to be established, it will try to reconnect.
amqpConn
will hold the connection and channels will be set up in the connection.
whenConnected
will be called when a connection is established.
function whenConnected() {
startPublisher();
startWorker();
}
The function
whenConnected
calls two function, one function that starts the publisher
and one that starts the worker (the consumer).
Start the publisher
var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var [exchange, routingKey, content] = offlinePubQueue.shift();
publish(exchange, routingKey, content);
}
});
}
createConfirmChannel
opens a channel which uses "confirmation mode".
A channel in confirmation mode require each published message to be 'acked'
or 'nacked' by the server, thereby indicating that it has been handled.
offlinePubQueue
is an internal queue for messages that could not be sent when the application was offline.
The application will check this queue and send the messages in the queue if a
message is added to the queue.
Publish
function publish(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
}
The
publish
function will publish a message to an exchange with a given routing key.
If an error occurs the message will be added to the internal queue,
offlinePubQueue
Consumer
// A worker that acks messages only if processed successfully
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
});
}
amqpConn.createChannel
creates a channel on the connection.
ch.assertQueue
assert a queue into existence.
ch.consume
sets up a consumer with a callback to be invoked with each message it
receives. The function called for each message is called
processMsg
function processMsg(msg) {
work(msg, function(ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
processMsg
processes the message. It will call the work function
and wait for it to finish.
function work(msg, cb) {
console.log("Image processing of ", msg.content.toString());
cb(true);
}
The
work
function include the handling of the message information and
the image scaling processes. It is in this example a todo-function.
Close the connection on error
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
Publish
setInterval(function() {
publish("", "jobs", new Buffer("work work work"));
}, 1000);
start();
A new message will be published every second. A default exchange, identify by the empty string ("") will be used. The default exchange means that messages are routed to the queue with the name specified by routing_key, if it exists. (The default exchange is a direct exchange with no name)
More information about Node.js and CloudAMQP can be found here.
Please email us at contact@cloudamqp.com if you have any suggestions or feedback.