A Python Kafka Producer
In this article, you will see how to publish data to Apache-Kafka using a Python Producer with a practical example. I have created a GitHub repository (https://github.com/Plazas87/broker_message_producers) that contains a CLI application with the code and files we will be discussing.
The first step in almost every data project is data ingestion. Regardless of the data’s location or format, you should find a way to transform it into an incoming data flow for your project. This allows you to clean, process, and ultimately generate insights or build machine-learning models from it.
In this project, we aim to build and connect the necessary components for a data ingestion setup using Kafka and Python. For the sake of simplicity, we will be using a ‘.csv’ file as the data source and simulate a continuous incoming data flow with its data. Think of it as something similar to a temperature sensor or stock prices during open hours.
Start the Kafka broker
The first thing you need to do is start a Broker (a Broker is a server that has Kafka running on it). For this, we are going to use a docker-compose.yaml
file to set up the Broker service.
version: '3'
services:
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka_broker
ports:
- '9094:9094'
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# For safety reasons, do not use this flag in a production environment.
- ALLOW_PLAINTEXT_LISTENER=yes # the listener will be without authentication and non-encrypted
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://127.0.0.1:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
volumes:
- /kafka/data:/bitnami/kafka
NOTE: more about bitnami Kafka image configuration here - https://hub.docker.com/r/bitnami/kafka/
To start the Broker run:
docker-compose up -d kafka
Now that we have the Kafka Broker up and running, it’s time to let it know how to handle all the incoming data. Basically, the broker organizes and stores those incoming messages (which we’ll call events) into topics. Think of topics like folders in a file system, and the events are like the files inside those folders. So, let’s tell the broker where to put all this incoming data:
- Open an interactive terminal in the Broker container:
docker exec -it kafka_broker bash
2. Navigate to the following directory /opt/bitnami/kafka/bin
:
cd opt/bitnami/kafka/bin
3. Now, let’s create our first topic. Execute the following command:
kafka-topics.sh --create --bootstrap-server 127.0.0.1:9094 --replication-factor 1 --partitions 1 --topic room_1
As a result, you should see something like the following:
NOTE: Pay attention to the warning message as it is important to be aware of the limitations regarding the topic name.
Now, let’s test if a producer can send messages to the topic we just created. Run the following command:
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9094 --producer.config /opt/bitnami/kafka/config/producer.properties --topic room_1
This command allows you to write as many messages as you want. The topic will receive and store these messages until a Consumer processes them (Press CTRL-C to stop sending messages). You can keep this producer active to continue sending messages. In the next step, you will see the consumer receiving these messages.
4. Consume messages from a topic:
Once the message(s) have been sent, let’s consume the whole stream of messages, starting from the first one (pay attention to the flag ‘ — from-beginning’):
In a new terminal, execute steps 1 and 2, and then run the following command:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9094 --consumer.config /opt/bitnami/kafka/config/consumer.properties --topic room_1 --from-beginning
NOTE: In this example, once a Consumer has consumed a message, it is no longer available.
You should see something similar to the following:
Data ingestion using Python
Now that we have a running Broker server, we can connect a data source (a Producer) to it. For this purpose, I have created a CLI Python application that launches Producers to read data from a file and continuously send it to the Kafka topic, let’s see how to do it.
python -m producers --help
Let’s inspect the Kafka command:
python -m producers kafka --help
To start a Kafka Producer that publishes messages directly to the ‘room_1’ topic we created
Open a new terminal in the root of the project and run:
python -m producers kafka --host localhost --port 9094 --topic room_1 --partition 0 --file-path ./data/room_1/temperature.csv
Once the Producer is running you will be able to consume these messages. Do you remember step 4 in the last section that we use to consume messages from the topic? ok, let’s repeat these steps and you should see something like the following:
And that’s it, you got it!
But what’s happening under the hood? let’s dive a little bit deeper into our Python Kafka Producer.
First, as I mentioned before, we are using a “.csv” file that contains fake data to simulate a temperature sensor instead of generating fake data on the fly. I opted for this approach to take advantage of the situation and talk about a very common problem. For example, Sometimes, you may have large “.csv” files, and attempting to load them entirely into memory for could lead to application crashes due to out-of-memory errors. To mitigate this issue, I am using Pandas to load small “chunks” of data at a time, rather than loading the entire file. Also, pay attention to the return type of this function, it is a generator (“In Python, a generator is a function that returns an iterator that produces a sequence of values when iterated over. Generators are useful when we want to produce a large sequence of values, but we don’t want to store all of them in memory at once.” — https://www.programiz.com/python-programming/generator)
class Reader(IReader[Dict[str, Any]]):
"""Reader class."""
def __init__(self, path: str) -> None:
"""Class constructor."""
self._path = pathlib.Path(path)
def read(self) -> Generator[Dict[str, Any], None, None]:
"""Read data from a .csv file and return it as generator.
Yields:
Generator[Dict[str, int], None, None]: data generator.
"""
for chunk in pd.read_csv(self._path, chunksize=10):
for measurement in chunk.itertuples(name="Measurement"):
# simulates a sample rate of 1 second for a sensor
time.sleep(1)
yield {
"temperature": measurement.temperature,
"timestamp": measurement.timestamp
}
Secondly, the serializer is responsible for formatting the message before sending it to the Broker
class Serializer(ISerializer[Dict[str, int], bytes]):
"""Serializer class."""
def serialize(self, data: Dict[str, int]) -> bytes:
"""Serialize a json object to bytes."""
return json.dumps(data).encode('utf-8')
Finally, we are using our “__main__.py” file to build our CLI commands. You can think of them as little orchestrators that read the data prepare a message and it to the producer.
@app.command()
def kafka(
host: Annotated[
str, typer.Option(help="Bootstrap server host.", rich_help_panel="Customization and Utils")
],
port: Annotated[
int, typer.Option(help="Bootstrap server port.", rich_help_panel="Customization and Utils")
],
topic: Annotated[
str, typer.Option(help="Topic to publish message to.", rich_help_panel="Customization and Utils")
],
partition: Annotated[
int, typer.Option(help="Topic's partition.", rich_help_panel="Customization and Utils")
],
file_path: Annotated[
str,
typer.Option(
help="Path to the '.csv' file.",
rich_help_panel="Customization and Utils",
),
] = ".",
) -> None:
"""Publish messages to a kafka broker."""
reader = Reader(path=file_path)
producer = clients.kafka.Producer(
bootstrap_server_host=host, bootstrap_server_port=port, serializer=serializers.bytes.Serializer()
)
# start reading the data as a stream
for data in reader.read():
message = KafkaMessage(topic=topic, partition=partition, body=data)
producer.publish(message=message)
@dataclass
class KafkaMessage(IMessage):
"""KafkaMessage DTO."""
topic: str
partition: str
body: bytes
class Producer(IProducer):
"""Producer class."""
_serializer: ISerializer[Dict[str, int], bytes]
_bootstrap_server_host: str
_bootstrap_server_port: int
def __init__(
self,
serializer: ISerializer[Dict[str, int], bytes],
bootstrap_server_host: str,
bootstrap_server_port: int,
) -> None:
self._serializer = serializer
self._broker_client = KafkaProducer(
bootstrap_servers=f"{bootstrap_server_host}:{str(bootstrap_server_port)}",
value_serializer=self._serializer.serialize,
)
def publish(self, message: KafkaMessage) -> None:
"""Publish data to the broker."""
self._broker_client.send(topic=message.topic, partition=message.partition, value=message.body)
logger.info("Message sent: '%s'", message.body)
With this Python CLI application, you can initiate different Producers to send data to various topics. For instance, to commence sending data to the topic ‘room_2’::
python -m producers kafka --host localhost --port 9094 --topic room_2 --partition 0 --file-path ./data/room_1/temperature.csv