The CloudAMQP team continues to enhance the RabbitMQ community by introducing new, useful tools. Recently, we launched an AMQP Ruby Client with great response and now we are introducing a new Crystal AMQP client into the mix. The client is written in pure Crystal with zero dependencies, which makes it easy and safe to integrate with Crystal applications and RabbitMQ.
As a relatively new (2014) open-source programming language, Crystal has a massive fanbase of developers from all over the world. The new AMQP Crystal client will allow for better communication between applications written in Crystal and RabbitMQ.
Crystal AMQP::Client - Fast and safe by default
In 2021, CloudAMQP launched version 1.0.0 of the open-source amqp-client that seamlessly bridges Crystal applications with RabbitMQ over AMQP.
With only ~1000 lines of code, and no dependencies, this new AMQP::Client is slimmed down without compromising on functionality. Sent messages are published as persistent, which means that messages are stored on disk and will survive broker/server crash or restart (when the queue is set to durable). The client also waits for message confirmation from the broker, ensuring that no messages are lost in transit. This feature can be disabled if performance is a priority. The client is also thread-safe.
Get started with Crystal and AMQP::Client
Installation
-
Add the dependency to your
dependencies: amqp-client: github: cloudamqp/amqp-client.cr
-
Then run
shards install
Usage
require "amqp-client"
AMQP::Client.start("amqp://guest:guest@localhost") do |c|
c.channel do |ch|
q = ch.queue("my-queue")
q.subscribe(no_ack: false) do |msg|
puts "Received: #{msg.body_io.to_s}"
ch.basic_ack(msg.delivery_tag)
end
# publish directly to a queue without confirm
q.publish "msg"
# publish directly to a queue, blocking while waiting for confirm
q.publish_confirm "msg"
# publish to any exchange/routing-key
ch.basic_publish "msg", exchange: "amq.topic", routing_key: "a"
# publish to any exchange/routing-key and wait for confirm
ch.basic_publish_confirm "msg", exchange: "amq.topic", routing_key: "a"
# This statement will block until a message has arrived
# The only way to "escape" the block is to unsubscribe
q.subscribe(tag: "myconsumer", block: true) do |msg|
q.unsubscribe("myconsumer")
end
# Consume and ack, nack or reject msgs
ch.basic_consume("queue", tag: "consumer-tag", no_ack: false, exclusive: false, block: false) do |msg|
case msg.body_io.to_s
when "ack"
ch.basic_ack(msg.delivery_tag)
when "reject"
ch.basic_reject(msg.delivery_tag, requeue: true)
when "nack"
ch.basic_nack(msg.delivery_tag, requeue: true, multiple: true)
end
end
ch.prefetch(count: 1000) # alias for basic_qos
name, message_count, consumer_count =
ch.queue_declare(name: "myqueue", passive: false, durable: true,
exclusive: false, auto_delete: false,
args = Arguments.new)
q = ch.queue # temporary queue that is deleted when the channel is closed
ch.queue_purge("myqueue")
ch.queue_bind("myqueue", "amq.topic", "routing-key")
ch.queue_unbind("myqueue", "amq.topic", "routing-key")
msg = ch.basic_get("myqueue", no_ack: true)
ch.basic_ack(msg.delivery_tag)
ch.queue_delete("myqueue")
ch.exchange_declare("my-exchange", type: "topic")
ch.exchange_delete("my-exchange")
end
end
We at CloudAMQP always strive for meeting customer and community demands and we hope that you like our contribution. Further reading through Github links is available here:
GitHub repository: https://github.com/cloudamqp/amqp-client.cr
We hope you like this improvement of our service. If you have any questions or comments on the blog above, send us an email at contact@cloudamqp.com
Until next time, CloudAMQP team