A Python RabbitMQ producer

Andres Plazas
9 min readAug 18, 2024

--

Photo by Elena Mozhvilo on Unsplash

In this article, you will learn how to publish data to RabbitMQ using a Python Producer, illustrated through a practical example. I’ve created a GitHub repository containing a CLI application with the code and files we’ll discuss.

The first step in almost every data project is data ingestion. Regardless of the data’s location or format, you need to find a way to transform it into an incoming data flow for your project. This allows you to clean, process, and ultimately generate insights or build machine-learning models from it.

In this project, our goal is to build and connect the necessary components for a data ingestion setup using RabbitMQ and Python. For simplicity, we will use a .csv file as the data source and simulate a continuous incoming data flow. Think of it as something similar to a temperature sensor or stock prices during open hours.

Configure and Start the RabbitMQ node

The first step is to start a Broker node (a Broker is a server with a RabbitMQ instance running on it). For this, we will use a docker-compose.yaml file to set up the Broker service:

version: '3'

services:
rabbitmq:
build:
context: ./rabbitmq
container_name: rabbitmq_broker
ports:
- 5672:5672
- 15672:15672
volumes:
- rabbitmq_data:/var/lib/rabbitmq

As you can see, the Docker context for building the RabbitMQ image points to a directory called rabbitmq. I chose to do this because RabbitMQ requires configuration files to set up Queues, Topics, and Exchanges. The directory I created looks like this:

Let’s start by reviewing the Dockerfile file.

FROM rabbitmq:3.12-management

COPY config/20-defaults.conf /etc/rabbitmq/conf.d
COPY config/definitions.json /etc/rabbitmq

This Dockerfile copies the configuration files to a specific location within the image. This action makes these files available for pre-configuring the Broker node. You can learn more about this here and here.

Now, let’s review the 20-defaults.conf file:

default_user = admin
default_pass = admin

# when set to true, definition import will only happen
# if definition file contents change
definitions.skip_if_unchanged = true
definitions.import_backend = local_filesystem
definitions.local.path = /etc/rabbitmq/definitions.json

The first thing to note is the filename. By naming the file this way, we ensure that the entire default configuration for a Broker node is loaded first, as stated in the documentation:

“They will be loaded in alphabetical order. A common naming practice uses numerical prefixes in filenames to make it easier to reason about the order, or make sure a “defaults file” is always loaded first, regardless of how many extra files are generated at deployment time”

ls -lh /path/to/a/custom/location/rabbitmq/conf.d
# => -r--r--r-- 1 rabbitmq rabbitmq 87B Mar 21 19:50 00-defaults.conf
# => -r--r--r-- 1 rabbitmq rabbitmq 4.6K Mar 21 19:52 10-main.conf
# => -r--r--r-- 1 rabbitmq rabbitmq 1.6K Mar 21 19:52 20-tls.conf
# => -r--r--r-- 1 rabbitmq rabbitmq 1.6K Mar 21 19:52 30-federation.conf

For more details, see here.

Now, let's review its content. The first two lines configure the credentials to be used when sending messages to the Broker. These credentials also allow you to access the RabbitMQ management plugin, which we’ll use to check that the broker received our messages (more information here). The final section tells the Broker node where to look for definitions.

Definitions

The definitions file allows us to configure the Broker node and can be imported at node startup or using RabbitMQ CLI tools:

“Nodes and clusters store information that can be thought of schema, metadata or topology. Users, vhosts, queues, exchanges, bindings, runtime parameters all fall into this category. This metadata is called definitions in RabbitMQ parlance.

Definitions can be exported to a file and then imported into another cluster or used for schema backup or data seeding.

Definitions are stored in an internal database and replicated across all cluster nodes. Every node in a cluster has its own replica of all definitions. When a part of definitions changes, the update is performed on all nodes in a single transaction. This means that in practice, definitions can be exported from any cluster node with the same result. VMware Tanzu RabbitMQ supports Warm Standby Replication to a remote cluster, which makes it easy to run a warm standby cluster for disaster recovery”. (for more information about this here: https://www.rabbitmq.com/docs/management)

Let’s review our definitions file:

{
"bindings": [
{
"source":"temperature",
"vhost":"/",
"destination":"room_1",
"destination_type":"queue",
"routing_key":"temperature.room_1",
"arguments":{}
}
],
"exchanges": [
{
"name": "temperature",
"vhost": "/",
"type": "topic",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"global_parameters": [],
"parameters": [],
"permissions": [
{
"configure": ".*",
"read": ".*",
"user": "admin",
"vhost": "/",
"write": ".*"
}
],
"policies": [],
"queues": [
{
"name":"room_1",
"vhost":"/",
"durable":true,
"auto_delete":true,
"arguments":{}
}
],
"rabbit_version": "3.12.2",
"rabbitmq_version": "3.12.2",
"topic_permissions": [],
"users": [
{
"hashing_algorithm": "rabbit_password_hashing_sha256",
"limits": {},
"name": "admin",
"password_hash": "Xz1Cbqa1+QGyNGohlpT24SDCwxZVZoVIeq50De6IZRT/Ki5M",
"tags": [
"administrator"
]
}
],
"vhosts": [
{
"limits": [],
"metadata": {
"description": "Default virtual host",
"tags": []
},
"name": "/"
}
]
}

This Exchange will receive all the data we publish. Additionally, we are defining a queue named “room_1,” which we will bind to the “temperature” Exchange using a “routing key.” This means that all the messages published to the “temperature” Exchange with the routing key temperature.room_1” will be delivered to our “room_1queue. Finally, we grant the “admin” user full permissions.

Finally, to start the broker using our Docker compose using our docker-compose.yaml file just run:

docker compose up rabbitmq

Now, let’s check that everything is working with our node. To do this, we will use the RabbitMQ management plugin, which is already running in the background thanks to the Docker image we used. Open a new browser tab and go to http://localhost:15672/#/ then log in to the management plugin using the credentials you configured in 20-defaults.conf.

If everything is OK you should see something like this:

RabbitMQ Management Plugin — Overview
RabbitMQ Management Plugin — Queues
RabbitMQ Management Plugin — Exchanges

Now that we have finished configuring the broker, we can move on to publishing messages to it.

RabbitMQ Producer

Now let’s send some data to our Broker node, specifically to our Queueroom_1”. To do this, I have created a CLI Python application that reads data from a file and continuously sends it to our Queue by using a RabbitMQ Producer, let’s see how to do it.

python -m producers --help

Let’s inspect the RabbitMQ command:

python -m producers rabbit-mq --help

To start a RabbitMQ Producer that publishes messages directly to the “room_1Queue we created, open a new terminal in the project’s root directory and run:

python -m producers rabbit-mq --host=localhost --port=5672 --exchange=temperature --topic=temperature.room_1 --vhost=/ --user=admin --password=admin --file-path=./data/room_1/temperature.csv

Once the Producer is running, you will be able to see and consume these messages. Do you remember the Management plugin we used earlier to check our Queues? Let's go to that view, and you should see the number of messages in the "room_1" queue start to increase.

Finally, you can manually consume these messages to confirm that our Producer is working. To do this, inspect the “room_1queue by clicking on it and scrolling down to the ‘Get messages’ section. Once there, let’s get the first two messages we sent.

And that’s it, you got it!

But what’s happening under the hood? let’s dive a little bit deeper into our Python RabbitMQ Producer.

First, as I mentioned before, we are using a “.csv” file that contains fake data to simulate a temperature sensor instead of generating fake data on the fly. I opted for this approach to take advantage of the situation and talk about a very common problem. For example, Sometimes, you may have large “.csv” files, and attempting to load them entirely into memory for could lead to application crashes due to out-of-memory errors. To mitigate this issue, I am using Pandas to load small “chunks” of data at a time, rather than loading the entire file. Also, pay attention to the return type of this function, it is a generator (“In Python, a generator is a function that returns an iterator that produces a sequence of values when iterated over. Generators are useful when we want to produce a large sequence of values, but we don’t want to store all of them in memory at once.” — https://www.programiz.com/python-programming/generator)

class Reader(IReader[Dict[str, Any]]):
"""Reader class."""

def __init__(self, path: str) -> None:
"""Class constructor."""
self._path = pathlib.Path(path)

def read(self) -> Generator[Dict[str, Any], None, None]:
"""Read data from a .csv file and return it as generator.

Yields:
Generator[Dict[str, int], None, None]: data generator.
"""
# Suppose the file that we are going to read could be huge and load it
# completely in memory could crash your running process.
# To avoid this, we are going to fetch fixed size chunks of data.
for chunk in pd.read_csv(self._path, chunksize=10):
for measurement in chunk.itertuples(name="Measurement"):
time.sleep(5) # simulates a sample rate of 5 second for a sensor
yield {
"temperature": measurement.temperature,
"timestamp": measurement.timestamp
}

Secondly, the serializer is responsible for formatting the message before sending it to the Broker

class Serializer(ISerializer[Dict[str, Any], bytes]):
"""Serializer class."""

def serialize(self, data: Dict[str, Any]) -> bytes:
"""Serialize a json object to bytes."""
return json.dumps(data).encode('utf-8')

Finally, we are using our “__main__.py” file to build our CLI commands. You can think of them as little orchestrators that read the data prepare a message and it to the producer.

@app.command()
def rabbit_mq(
host: Annotated[
str, typer.Option(help="Broker server host.")
],
port: Annotated[
int, typer.Option(help="Broker server port.")
],
exchange: Annotated[
str, typer.Option(help="Exchange to use.")
],
topic: Annotated[
str, typer.Option(help="Topic to publish message to.")
],
vhost: Annotated[
str, typer.Option(help="Virtual host.")
],
user: Annotated[str, typer.Option(
help="User."
)],
password: Annotated[str, typer.Option(
help="Password."
)],
file_path: Annotated[
str,
typer.Option(
help="Path to the '.csv' file that contains the sample data."
),
] = ".",
) -> None:
"""Plusblish messages to RabbitMQ broker."""
reader = Reader(path=file_path)
bytes_seralizar = serializers.bytes.Serializer()

logger.info("Starting the RabbitMQ producer...")

producer = clients.rabbitmq.Producer(
host=host,
port=port,
user=user,
password=password,
vhost=vhost,
delivery_mode="Transient",
content_type="application/json",
content_encoding="utf-8",
)

# Start reading the data as a stream from a file.
with producer as publisher:
for data in reader.read():
message = clients.rabbitmq.RabbitMQMessage(
topic=topic,
exchange=exchange,
body=bytes_seralizar.serialize(data=data)
)
publisher.publish(message=message)

As you can see, I have created the Producer to be used as a context manager to control the connection with the broker. The following is the Producer implementation.

from __future__ import annotations
from dataclasses import dataclass

import logging

import pika
from pika.adapters.blocking_connection import BlockingChannel

from ...application.ports import IProducer, IMessage

logger = logging.getLogger(__name__)

@dataclass
class RabbitMQMessage(IMessage):
"""RabbitMQMessage class."""

exchange: str
topic: str
body: bytes


class Producer(IProducer[IMessage]):
"""RabbitMQ Producer class."""

_user: str
_password: str
_host: str
_port: str
_vhost: str
_delivery_mode: str
_content_type: str
_content_encoding: str
_channel: BlockingChannel

def __init__(
self,
user: str,
password: str,
host: str,
port: str,
vhost: str,
delivery_mode: str = "Persistant",
content_type: str = "application/json",
content_encoding: str = "utf-8"
) -> None:
"""Class constructor."""
self._user = user
self._password = password
self._host = host
self._port = port
self._vhost = vhost
self._connection = None
self._delivery_mode = delivery_mode
self._content_type = content_type
self._content_encoding = content_encoding

def _connect(self) -> None:
"""Connect to RabbitMQ."""
credentials = pika.PlainCredentials(self._user, self._password)
parameters = pika.ConnectionParameters(
host=self._host, port=self._port, virtual_host=self._vhost, credentials=credentials
)
self._connection = pika.BlockingConnection(parameters=parameters)

def __enter__(self) -> Producer:
"""Create and start a connection with rabbitmq using the context manager interface."""
self._connect()
self._channel = self._connection.channel()

return self

def __exit__(self, exc_type, exc_value, exc_tb):
"""Stop the connection when the context manager is exited."""
# Gracefully close the connection
self._connection.close()

def publish(self, message: RabbitMQMessage) -> None:
"""
Publish a message to RabbitMQ.

Args:
message (RabbitMQMessage): message to publish

Returns
None
"""
properties = pika.BasicProperties(
delivery_mode=pika.DeliveryMode[self._delivery_mode],
content_type=self._content_type,
content_encoding=self._content_encoding
)

self._channel.basic_publish(
exchange=message.exchange,
routing_key=message.topic,
body=message.body,
properties=properties
)
logger.info("Message sent: '%s'", message.body)

We are creating a connection and a dedicated channel to send these messages. When we go out of the context manager the connection with the Broker is automatically closed (methods “__enter__” and “__exit__”).

--

--