Leer y escribir datos en Kafka con Python
En este artículo veras cómo publicar y suscribirte a flujos de datos utilizando Apache-Kafka y Python con un ejemplo práctico.
Hace algunos días quise simular el comportamiento de precios de una acción basándome en datos históricos y, así, practicar la compra y venta de acciones. Para ello, decidí crear un proyecto con la siguiente estructura.
El proyecto tiene tres componentes: un publicador (Proveedor de datos), un consumidor (Plataforma de negociación básica) y, en el medio, Apache-Kafka. Este último, permite el envío de mensajes (en este caso registros de precios) entre aplicaciones utilizando el patrón de mensajería publish/subscribe. Los mensajes del publicador son enviados directamente al servidor Kafka (también llamado Broker) quien los clasifica y ordena de forma persistente en topics, utilizando una o varias partitions en su interior. Luego, uno o más consumidores pueden suscribirse a uno, o varios, topics para recibir los mensajes; los datos son almacenados durante un tiempo determinado que puedes configurar, lo que previene la pérdida de mensajes en caso de fallas en los consumidores. Si te interesa conocer los detalles y componentes de Apache-Kafka puedes visitar “https://kafka.apache.org/intro”.
1- Proveedor de datos (publicador)
El publicador de datos se encarga de enviar uno a uno y con una frecuencia determinada todos los registros contenidos en un archivo “.csv”, con el objetivo de simular el movimiento de precios de una acción. En esta ocasión, utilicé información histórica de la variación de precios diarios de Netflix correspondientes al año 2019 ( Yahoo Finance).
Para ello, he creado la siguiente aplicación :
producer/
├── __init__.py
├── __main__.py
├── producer.py
└── reader
├── data_reader.py
└── __init__.py
El fichero data_reader.py se encarga de todo lo relacionado con la lectura y preparación de datos antes de ser enviados.
Por otro lado, el fichero producer.py contiene los métodos necesarios para enviar los registros (mensajes) del fichero “.csv” a una determinada partition dentro de un topic.
Por último, el archivo __main__.py, es el punto de entrada de la aplicación.
Para ejecutar esta aplicación, primero necesitas instalar la librería Kafka-python. Así mismo, necesitas instalar pandas para el procesamiento de datos. Recuerda preparar antes un entorno virtual para el proyecto, así podrás gestionar y mantener las dependencias de forma aislada.
#install kafka-python
python -m pip install kafka-python#install pandas
python -m pip install pandas
Asimismo, dado que vas a comenzar a escribir datos directamente sobre una partition dentro de un topic, necesitas iniciar Zookeeper server y un broker de Kafka. En el siguiente apartado se describen los pasos necesarios para esto.
2- Apache-Kafka
Apache-kafka es una plataforma de transmisión de datos distribuida que permite recibir, almacenar, publicar y procesar flujos de registros (datos). Está diseñada para manejar flujos de datos de varias fuentes y distribuirlos a diversos suscriptores.
Como se mencionó antes, Kafka utiliza topics y partitions para organizar los mensajes de uno o mas publicadores. Cada broker puede contener uno o mas topics y, a su vez, un topic puede contener una o mas partitions. En esta ocasión, vamos a utilizar el caso más simple: 1 broker, 1 topic y 1 partition.
Antes de comenzar, debes instalar Apache-Kafka en tu ordenador, en este link puedes encontrar toda la información. Una vez instalado, lo primero es iniciar Zookeeper server. Para ello, abre una terminal y ejecuta:
#Start zookeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Ahora, vamos a iniciar el broker de Kafka. En otra terminal ejecuta:
# Start Kafka broker
$ bin/kafka-server-start.sh config/server.properties
Una vez hecho esto, procedemos a crear un topic con un nombre alusivo a la información que vamos a escribir en él; por defecto, esta acción, crea una partition dentro del topic. Para ello, en una tercera terminal ejecuta:
# create a topic
$ bin/kafka-topics.sh --create --topic netflix --bootstrap-server localhost:9092
Por último, inspeccionamos el topic creado para comprobar que, efectivamente, contiene una partición (partition 0).
# describe topic
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Solo hasta este momento, el publicador puede escribir mensajes en la partition del topic . Vamos a ejecutar la aplicación (descrita en el apartado 1) pasando como parámetros: el nombre del fichero “.csv” , el nombre del topic y la frecuencia de envío de mensajes (en segundos).
# Frequency (seconds): 5
# Topic name: netflix
# Filename: NFLX_2019.csv# Start the producer
$ python -m producer NFLX_2019.csv netflix 5
Para comprobar que los mensajes se están escribiendo en la partition del topic, en otra terminal ejecuta:
# Star console consumer client
$ bin/kafka-console-consumer.sh --topic netflix --from-beginning --bootstrap-server localhost:9092
3- Plataforma de negociación básica
El ultimo componente de este proyecto es una aplicación que permite simular la compra y venta de acciones de una compañía. Está compuesta por tres componentes: un consumidor, un graficador y un ejecutor de transacciones.
El consumidor se encarga de recibir los datos publicados por el publicador, para ello, he creado la siguiente aplicación.
consumer
├── consumer.py
├── __init__.py
└── __main__.py
El archivo consumer.py, al igual que el publicador, utiliza la librería Kafka-python para conectarse al broker de kafka y consumir los datos de uno o varios topics. En este caso, me suscribo al topic que he creado en el apartado anterior.
Para iniciar esta aplicación, en una terminal ejecuta:
# Topic name: netflix# Start consumer
$ python -m consumer netflix
Si tienes Kafka (apartado 2), el publicador y el consumidor iniciados deberías ver la trasmisión de mensajes.
Ahora que hemos simulado el cambio en el precio de la acción de Netflix, podemos utilizar esta información para elaborar un gráfico de velas japonesas (candlesticks) y simular la compra y venta de acciones teniendo en cuenta los movimientos de la gráfica. Para ello, he creado una tercera aplicación, que como verás, integra el consumidor:
trade_app
├── app_CLI.py
├── buildconfigurations
│ ├── build_configuration.py
│ └── __init__.py
├── config
│ ├── config_consumer.py
│ ├── config_database.py
│ ├── config_file_structure.py
│ ├── config_portfolio.py
│ ├── configpostgres.ini
│ ├── config_trade.py
│ └── __init__.py
├── consumer
│ ├── consumer.py
│ └── __init__.py
├── custom_exceptions
│ ├── exceptions.py
│ └── __init__.py
├── database_controller
│ ├── database_controller.py
│ └── __init__.py
├── __init__.py
├── __main__.py
├── orders
│ ├── buy_order.py
│ ├── __init__.py
│ ├── order_components.py
│ ├── order.py
│ └── sell_order.py
├── plotter
│ ├── candlestick_plotter.py
│ └── __init__.py
├── portfolio
│ ├── __init__.py
│ └── portfolio.py
├── trade
│ ├── __init__.py
│ ├── trade_components.py
│ └── trade.py
└── trader
├── __init__.py
└── trader.py
Básicamente permite simular la compra y venta de acciones teniendo en cuenta un presupuesto inicial configurable. También, almacena las órdenes ejecutadas en una base de datos, así, puedes analizar tus patrones compra y venta con simples consultas.
Específicamente el package plotter de esta aplicación es el encargado de generar y actualizar el gráfico de velas japonesas a medida que el consumidor recibe los datos.
Por último, dependiendo del comportamiento de la gráfica, puedes ejecutar órdenes de compra y venta, basta con presionar la tecla “b” (buy) y digitar el número de acciones para ejecutar una compra y, la tecla “s” (sell) seguido del “ID” de la orden para ejecutar una venta; todo quedará almacenado en la base de datos.
A continuación, se muestran algunos ejemplos de compra y venta de la acción de netflix.
Compra de 5 acciones:
Cierre de la orden con ID: 202010422407223
Puedes realizar tantas compras como el capital disponible te lo permita, después, puedes seleccionar cuál de ellas cerrar utilizando su ID.
Por ultimo, consulta el historial de órdenes en la base de datos.
Conclusión
Utilizar Apache-Kafka como herramienta de trasmisión de mensajes tiene grandes ventajas, entre las principales: permite gestionar una gran cantidad de mensajes por segundo con baja latencia (del orden de milisegundos), almacena eficazmente flujos de mensajes en el orden en que se publicaron, permite construir pipelines de datos para aplicaciones que requieren análisis de datos en tiempo real, escalable horizontalmente debido a su naturaleza distribuida y tolerante a fallos.
En esta ocasión, utilizar Kafka me permitió evitar la pérdida de información entre la fuente de datos y el consumidor cuando:
- El consumidor falla o simplemente no esta disponible: en este escenario, cuando el consumidor esta disponible (o se recupera de un fallo), recibe los mensajes que se emitieron durante su ausencia en el orden en que fueron emitidos.
- El consumidor no se inicia al tiempo el con publicador: cuando el consumidor finalmente se conecta es posible recibir los datos desde el comienzo o, en caso de que no fuese necesario, comenzar a recibir los datos en tiempo real dejando de lado los datos publicados con anterioridad; sea cual sea tu necesidad, puedes configurar esto tipo de comportamiento.
Este proyecto se encuentra en construcción y, por ende, ira cambiando. Todo el código se encuentra disponible en mi repositorio por si quieres dar un vistazo.