Before we get started in this lesson, we assume you have a running RabbitMQ server installed, but if not, you can get started with a free RabbitMQ plan at CloudAMQP. Additionally, for your convenience, all code shown in this lesson is available on GitHub.
There are a growing number of ways to connect to RabbitMQ using Python including the use of ‘weak typing’ and even the asyncio framework.
The following Python-focused client libraries are in active development, as of this writing (2020-09-21):
- Pika: A pure python AMQP 0.9.1 library for connecting to the broker
- aio-pika: A pure python AMQP 0.9.1 library making use of the asyncio framework
- Qpid proton: A python library that allows you to use AMQP 1.0
These tools allow Python clients and AMQP brokers to work together in your application. This article, as well as the official RabbitMQ tutorial, use Pika, which continues to accommodate changes to the language, recently incorporating the Python asyncio framework.
Using a requirements file
Python allows your application to track dependencies through a special file named requirements.txt. You must place the file in the root folder for your project.
Install RabbitMQ Python library Pika, by adding the following to requirements.txt:
pika >= 1.1.0
Here we have specified the use of the latest version greater than or equal to 1.1.0. You can replace >= with == for an explicit release.
Run the following command from your root directory to complete the installation:
pip install -r requirements.txt
Note that you must install pip on your system prior to execution.
Types of connections in Pika
Pika allows you to choose the type of connection you create. Available options are:
-
The asynchronous connection adapter
select connection (SelectConnection).
SelectConnection is using callbacks. An example of a callback is
add_on_open_callback
which adds a callback notification once a connection has been opened. TheSelectConnection
can be useful if your RabbitMQ broker or your connection is slow or overloaded. - The blocking connection (BlockingConnection) waits for all requests to complete. This will block the execution thread until e.g channel_open or exchange_declared has returned. Its often simpler to use this sort of serialized logic.
- An asyncio connection connects through a special adapter. Python asyncio allow functions to avoid blocking while waiting on I/O. A RabbitMQ client sends a request and then allows tasks to run on the same thread while waiting for a response. This is perfect for consumers since they communicate with and receive messages from the broker.
Connecting with Pika using a Blocking Connection
The blocking connection works well when sending messages at irregular intervals. Requests block the connection until the server sends a response to the request, i.e. when the connection has been established.
Establish a connection as follows:
import pika, os
url = os.environ.get("CLOUDAMQP_URL", "amqp://guest:guest@localhost:5672/")
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel.exchange_declare('test_exchange')
channel.queue_declare(queue="test_queue")
channel.queue_bind("test_queue", "test_exchange", "tests")
channel.basic_publish(exchange="test_exchange",
routing_key="tests",
body="Hello CloudAMQP!")
channel.close()
connection.close()
In the code above was a connection established to your server and the test_queue was declared. This queue is bound to the default exchange before publishing to it.
Full code for the blocking connection can be found here: https://github.com/cloudamqp/python-amqp-example
Using a non-blocking Pika Connection to send messages
The Python Pika library allows you to avoid waiting for the server to send responses using threading or multiprocessing. Use the SelectConnection as follows:
class RabbitConnectionExample:
"""
RabbitMQ operations
"""
def __init__(self):
"""
Initializes the class
"""
self.url = os.environ[‘RABBITMQ_URL’]
self._barrier = Barrier(2, timeout=120)
def connection_callback(self, conn):
"""
Run on connecting to the server
:param conn: The connection created in the previous step
"""
self._connection.channel(on_open_callback=self.channel_callback)
def channel_callback(self, ch):
"""
Publish to the channel. You can use other methods with callbacks but only the channel
creation method provides a channel. Other methods provide a frame you can choose to
discard.
:param ch: The channel established
"""
properties = pika.BasicProperties(content_type='application/json')
ch.basic_publish(exchange='test_exchange',
routing_key='tests',
properties=properties,
body="Hello CloudAMQP!")
self._barrier.wait(timeout=1)
ch.close()
self._connection.close()
def run(self):
"""
Runs the example
"""
def run_io_loop(conn):
conn.ioloop.start()
params = pika.URLParameters(self._url)
self._connection = pika.SelectConnection(
params, on_open_callback=self.connection_callback)
if self._connection:
t = threading.Thread(target=run_io_loop, args=(self._connection, ))
t.start()
self._barrier.wait(timeout=30)
self._connection.ioloop.stop()
else:
raise ValueError
RabbitConnectionExample().run()
The library chooses a polling mechanism based on your operating system to avoid blocking requests. You will need to create a workflow with a callback for each step required to send or receive messages, and then stop the loop when finished.
Creating a consumer in Pika
Pika is an AMQP 0.9.1 based library. Use the
basic_consume
method to
receive responses:
url = os.environ.get(“CLOUDAMQP_URL”, “'amqp://guest:guest@localhost:5672/”)
params = 'pika.URLParameters(URL)
connection = pika.BlockingConnection(params)
channel = connection.channel()
for method_frame, properties, body in channel.consume('test_queue'):
print(str(body))
channel.close()
connection.close()
This method works with the
BlockingConnection
and
SelectConnection.
We can
create a handle for each message or use the return value like an iterator in
the
BlockingConnection.
Using Python Asyncio to create a consumer
You can use the
asyncio framework
to create a consumer in Pika using AMQP 0.9.1. The process is similar to
that used in the
SelectConnection.
First, convert the run function from our
SelectConnection
example to the following:
def run(self):
"""
Run the example.
"""
print(sys.platform)
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
params = pika.URLParameters(self._url)
self._connection = AsyncioConnection(parameters=params,
on_open_callback=self.open_connection_callback,
on_open_error_callback=self.open_connection_error_callback,
on_close_callback=self.close_connection_callback)
if self._connection:
self._connection.ioloop.run_forever()
return self._closing
We switched to the
AsyncioConnection
which avoids blocking a thread until data is available.
To create a consumer in either the
SelectConnection
or
AsyncioConnection
set the
prefetch count
and move from using
basic_publish
to
basic_consume
after binding to the your queue in
queue_bind_callback
:
def on_message(self, _unused_channel, basic_deliver, _properties, body):
"""
Called when your consumer receives a message.
:param _ch: Unused channel receiving the message
:param basic_deliver: AMQP.BasicDelivery object
:param properties: incoming message properties
:param body: incoming message body
"""
print("Received {}\n{}".format(basic_deliver.delivery_tag, str(body)))
def qos_callback(self, _unused_frame):
"""
Called after the QOS request succeeds
:param _unused_frame:
"""
print("Consuming from Queue")
self._channel.add_on_cancel_callback(self.consumer_cancelled)
self._consumer_tag = self._channel.basic_consume("test_queue", self.on_message,
auto_ack=True)
self._consuming = True
self._was_consuming = True
def queue_bind_callback(self, _unused_frame):
"""
Queue bind callback
:param _unused_frame: Unused frame created by the queue bind call
"""
print("Setting Prefetch")
self._channel.basic_qos(prefetch_count=1, callback=self.qos_callback)
A broader Python asyncio tutorial can help you to understand the underpinnings of the framework.
CloudAMQP works with any library that can connect to RabbitMQ. So, let's start publishing and consuming messages today!
Further Reading
Python and AMQP 1.0 using Qpid Proton
Happy coding!