Recommended C client for RabbitMQ is the Rabbitmq-c client library . The library is maintained by alanxz.
The sample code below show how to set up a connection
to CloudAMQP and how to set up a listener for messages.
The code below is build upon
the example given in the example folder for the lib, amqp_listen.c,
additional information is added to show how to set up the CloudAMQP credentials.
An important part for users on shared plans is to set
channel_max to something else then 0.
amqp_login(conn, vhost, channel_max, frame_max,heartbeat, AMQP_SASL_METHOD_PLAIN, user, password),"Logging in");
(A request to change the
library is sent to the creator of the library. 0 is equal to unlimited,
and that will result in errors with CloudAMQPs channel_max limit
on the shared plans.)
More information about the library can be found here: alanxz.github.io/rabbitmq-c/docs/0.2/annotated.html
//amqp_listen.c
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <assert.h>
#include "utils.h"
int main(int argc, char const *const *argv)
{
char const *hostname, *vhost, *password, *username;
int port, status;
char const *exchange;
char const *bindingkey;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
hostname = "hostname"; //bunny.cloudamqp.com
port = 5672;
exchange = "amq.direct";
bindingkey = "test";
vhost = "vhost";
username = "username"
password = "password";
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) { die("creating TCP socket"); }
status = amqp_socket_open(socket, hostname, port);
if (status) { die("opening TCP socket"); }
die_on_amqp_error(amqp_login(conn, vhost, 200, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
while (1) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
(unsigned)envelope.delivery_tag,
(int)envelope.exchange.len, (char *)envelope.exchange.bytes,
(int)envelope.routing_key.len, (char *)envelope.routing_key.bytes);
if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
(int)envelope.message.properties.content_type.len,
(char *)envelope.message.properties.content_type.bytes);
}
printf("----\n");
amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
amqp_destroy_envelope(&envelope);
}
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}