This article assumes that you have a running RabbitMQ server installed, if not - get started with a free RabbitMQ plan at CloudAMQP.
There are multiple NodeJS clients for RabbitMQ. We focus on the actively developed amqplib and Rascal libraries. Whereas amqplib, as a barebones implementation of AMQP, gives the developer flexibility, Rascal handles common tasks to reduce the potential for programming errors.
What is amqplib?
The amqplib library is among the few remaining unabandoned NodeJS RabbitMQ libraries. No assumptions are made regarding your use of AMQP. Connections are not retried, the topology is not automatically created, and you must handle message failures.
Amqplib exposes synchronous and asynchronous methods over the same API. You may choose between implementing an application using promises or callbacks.
Publishing messages in amqplib
The amqplib library uses the AMQP 0.9.1 protocol. Channels created on connections push messages to RabbitMQ.
Use the following code to create your publisher:
const amqplib = require('amqplib');
var amqp_url = process.env.CLOUDAMQP_URL || 'amqp://localhost:5672';
async function produce(){
console.log("Publishing");
var conn = await amqplib.connect(amqp_url, "heartbeat=60");
var ch = await conn.createChannel()
var exch = 'test_exchange';
var q = 'test_queue';
var rkey = 'test_route';
var msg = 'Hello World!';
await ch.assertExchange(exch, 'direct', {durable: true}).catch(console.error);
await ch.assertQueue(q, {durable: true});
await ch.bindQueue(q, exch, rkey);
await ch.publish(exch, rkey, Buffer.from(msg));
setTimeout( function() {
ch.close();
conn.close();}, 500 );
}
produce();
This producer connects to the broker, creates our exchange and queue, binds the queue to the exchange, and then publishes our test message. We close the channel and connection when finished.
Consuming messages in amqplib
Programs using amqplib use the consume function to receive messages from RabbitMQ:
const amqplib = require('amqplib');
var amqp_url = process.env.CLOUDAMQP_URL || 'amqp://localhost:5672';
async function do_consume() {
var conn = await amqplib.connect(amqp_url, "heartbeat=60");
var ch = await conn.createChannel()
var q = 'test_queue';
await conn.createChannel();
await ch.assertQueue(q, {durable: true});
await ch.consume(q, function (msg) {
console.log(msg.content.toString());
ch.ack(msg);
ch.cancel('myconsumer');
}, {consumerTag: 'myconsumer'});
setTimeout( function() {
ch.close();
conn.close();}, 500 );
}
do_consume();
We supply a message handler in the call to consume, closing the connection
and channel when the consumer terminates. There is no need to use await
with the call to
do_consume.
A rascal broker
As a wrapper around amqplib, Rascal attempts to simplify the use of RabbitMQ and other AMQP based frameworks in nodejs. The inability of amqplib to deal with connection issues and unparseable content can pose a serious problem for microservices.
Rascal attempts to resolve these issues and add common features such as:
- automated queue and exchange creation
- automated reconnection and resubscription
- flow control
- redelivery limits
- handles for specific failures
- safe defaults
- encryption
The library stores publication and subscription profiles which manage connections to the underlying RabbitMQ broker.
Configuring rascal
Rascal creates a specialized broker for each connection which manages publication and consumption from RabbitMQ. Set up the broker using a json configuration file:
{
"vhosts": {
"test": {
"connection": {
"url": "amqp://user:pwd@127.0.0.1:5672/test"
},
"exchanges": {
"test_exchange": {
"assert": true,
"type": "direct"
}
},
"queues": [
"test_queue"
],
"bindings": {
"b1": {
"source": "test_exchange",
"destination": "test_queue",
"destinationType": "queue",
"bindingKey": "test_route"
}
},
"publications": {
"demo_publication": {
"vhost": "test",
"exchange": "test_exchange",
"routingKey": "test_route"
}
},
"subscriptions": {
"demo_subscription": {
"queue": "test_queue",
"prefetch": 1
}
}
}
}
}
This file tells Rascal to create the test_exchange and binds the
test_queue
to the exchange. The tool attempts to set reasonable default values for
missing variables.
A Rascal publisher
Rascal brokers perform most of the work we needed to publish to RabbitMQ using amqplib. Creating the broker gives us access to an underlying publisher:
const Broker = require('rascal').BrokerAsPromised;
const config = require('./config.json');
async function rascal_produce(){
console.log("Publishing");
var msg = 'Hello World!';
const broker = await Broker.create(config);
broker.on('error', console.error);
const publication = await broker.publish('demo_publication', msg);
publication.on('error', console.error);
console.log("Published")
}
rascal_produce().catch(console.error);
The Rascal broker created and bound
test_queue
to
test_exchange
in the create function. We use the publish method to send a message to
RabbitMQ using the specified publisher from the
config.json
file.
Rascal Subscriber
Rascal uses subscription profiles when receiving messages.These profiles set up consumers:
const Broker = require('rascal').BrokerAsPromised;
const config = require('./config.json');
async function rascal_consume(){
console.log("Consuming");
const broker = await Broker.create(config);
broker.on('error', console.error);
const subscription = await broker.subscribe('demo_subscription', 'b1');
subscription.on('message', (message, content, ackOrNack)=>{
console.log(content);
ackOrNack();
subscription.cancel();
});
subscription.on('error', console.error);
subscription.on('invalid_content', (err, message, ackOrNack) =>{
console.log('Failed to parse message');
});
}
rascal_consume().catch(console.error);
Our asynchronous consumer sends messages to the appropriately registered functions based on the configuration above. Error handlers allow us to log failures.
Does Rascal nack my messages?
It is important to be aware that Rascal nacks messages. This occurs when there is no invalid content listener or the application exceeds the subscriber’s redelivery limit. Invalid content may include unparseable characters or corrupted messages.
Negative acknowledgment allows for bulk rejection. To avoid constantly requeuing failed messages, specify a special invalid content listener as follows:
try{
const subscription = await broker.subscribe("my_subscriber")
subscription.on("message", (message, content, ackOrNack) => {
// handle message
}).on("invalid_content", (err, message, ackOrNack) => {
// handle message with invalid content
})
} catch(err){
console.error("Subscription does not exist")
}
It is likely that a message with bad content will fail in other consumers as well. Handling this up front keeps your applications and microservices healthy. Use logstash or another third party tool to receive notifications and diagnose the underlying issue.
Which nodejs amqp library should I use?
The library you choose depends on the features you need to implement. If you are building complex software using enterprise integration patterns, Rascal offers additional benefits over the barebones amqplib.
Smaller applications may opt to use only amqplib. The features you need should dictate which library you choose.
RabbitMQ clients in Nodejs
The npm registry lists multiple RabbitMQ nodejs clients. While most are abandoned, Rascal and amqplib client are in active development. The amqplib client gives you flexibility through a barebones AMQP 0.9.1 API. You can use the Rascal wrapper to stabilize more complicated applications.
Any client that connects to RabbitMQ works with CloudAMQP.