The recommended library for Crystal to access RabbitMQ servers is AMQP::Client
With only ~1000 lines of code, and no dependencies, this amqp-client is a slimmed library but without compromising on functionality.
Sent messages are published as persistent, which means that messages are stored on disk and will survive broker/server crash or restart (when the queue is set to durable).
The client also waits for message confirmation from the broker, ensuring that no messages are lost in transit. This feature can be disabled if performance is a priority.
The client is also thread-safe.
dependencies:
amqp-client:
github: cloudamqp/amqp-client.cr
shards install
require "amqp-client"
AMQP::Client.start("amqp://guest:guest@localhost") do |c|
c.channel do |ch|
q = ch.queue("my-queue")
q.subscribe(no_ack: false) do |msg|
puts "Received: #{msg.body_io.to_s}"
ch.basic_ack(msg.delivery_tag)
end
# publish directly to a queue without confirm
q.publish "msg"
# publish directly to a queue, blocking while waiting for confirm
q.publish_confirm "msg"
# publish to any exchange/routing-key
ch.basic_publish "msg", exchange: "amq.topic", routing_key: "a"
# publish to any exchange/routing-key and wait for confirm
ch.basic_publish_confirm "msg", exchange: "amq.topic", routing_key: "a"
# This statement will block until a message has arrived
# The only way to "escape" the block is to unsubscribe
q.subscribe(tag: "myconsumer", block: true) do |msg|
q.unsubscribe("myconsumer")
end
# Consume and ack, nack or reject msgs
ch.basic_consume("queue", tag: "consumer-tag", no_ack: false, exclusive: false, block: false) do |msg|
case msg.body_io.to_s
when "ack"
ch.basic_ack(msg.delivery_tag)
when "reject"
ch.basic_reject(msg.delivery_tag, requeue: true)
when "nack"
ch.basic_nack(msg.delivery_tag, requeue: true, multiple: true)
end
end
ch.prefetch(count: 1000) # alias for basic_qos
name, message_count, consumer_count =
ch.queue_declare(name: "myqueue", passive: false, durable: true,
exclusive: false, auto_delete: false,
args = Arguments.new)
q = ch.queue # temporary queue that is deleted when the channel is closed
ch.queue_purge("myqueue")
ch.queue_bind("myqueue", "amq.topic", "routing-key")
ch.queue_unbind("myqueue", "amq.topic", "routing-key")
msg = ch.basic_get("myqueue", no_ack: true)
ch.basic_ack(msg.delivery_tag)
ch.queue_delete("myqueue")
ch.exchange_declare("my-exchange", type: "topic")
ch.exchange_delete("my-exchange")
end
end
.