Automatiza flujos de trabajo con apache-airflow

Andres Plazas
7 min readJul 16, 2020

--

Tree view

Recientemente participé en un proyecto que proveía datos con una frecuencia horaria. Cada fichero requería siempre las mismas transformaciones, lo que se convirtió en algo repetitivo y aburrido. Comencé a buscar alternativas para automatizar este proceso lo que me llevó a utilizar apache-airflow. Esta herramienta de software libre permite crear, monitorear y visualizar flujos de trabajo (workflows). El componente central de esta herramienta es el DAG (Direct Acyclic Graph). En él básicamente se define un conjunto de tareas junto con sus dependencias y orden de ejecución; describe cómo y cuándo se deben ejecutar dichas tareas. En este tutorial, veremos cómo automatizar una ingesta y transformación de datos con apache-airflow.

1. Creación de un entorno virtual

La creación de un entorno virtual de Python tiene como objetivo gestionar y mantener separadas las dependencias requeridas por diferentes proyectos. En esta ocasión, vamos a crear y activar un entorno virtual aislado de nombre “venv” dentro del directorio creado para el proyecto, utilizando la herramienta virtualenv.

# Creation of a virtual enviroment
python -m virtualenv venv
# Activate virtual enviroment
source venv/bin/activate

2. Instalación de apache-airflow

Con el entorno virtual activado se procede con la instalación de apache-airflow. Por defecto, esta herramienta crea el directorio “~/airflow” y, en su interior, el fichero “airflow.cfg” (fichero de configuración). Esta ubicación se puede modificar asignando a la variable AIRFLOW_HOME la dirección del directorio creado para nuestro proyecto con el siguiente comando.

export AIRFLOW_HOME=~/path/to/my_project

Así, se consigue que el fichero de configuración, creado automáticamente por apache-airflow, se genere dentro del directorio del proyecto. Seguido, basta con ejecutar el siguiente comando para comenzar la instalación.

python -m pip install apache-airflow

Si todo ha ido bien, al final del proceso, deberías ser capaz de comprobar la versión utilizando el siguiente comando.

airflow version
# 1.10.10

3. Configuración de PostgreSQL usando Docker

Airflow, por defecto, utiliza sqlite como base de datos para su funcionamiento. En este ejemplo, vamos a cambiar la base de datos y utilizaremos PostgreSQL en su lugar . Para ello, utilizando Docker, se crea un contenedor al que llamaremos “postgres_db” con el siguiente comando:

docker run --name postgres_db -e POSTGRES_PASSWORD=postgres -e POSTGRES_USER=airflow -p 5432:5432 -d postgres

Tras su ejecución se tiene una instancia de PostgreSQL lista para trabajar.

Para comprobar que el contenedor se ha creado correctamente y puede trabajar, listamos los contenedores activos.

docker container ls

Con la base de datos lista, se debe modificar el string de conexión a la base de datos en el fichero “airflow.cfg”. Como se mencionó antes, este archivo reside en el directorio$AIRFLOW_HOME . El siguiente es el string de conexión a la base de datos por defecto:

Para utilizar la instancia de PostgreSQL se debe modificar por:

Como se observa, Airflow utiliza el package psycopg2 para comunicarse con PostgreSQL. La instalación se puede realizar con el siguiente comando: python -m install psycopg2 .

Adicional, se modifica el executor con el objetivo de habilitar el procesamiento de tareas en paralelo de forma local. Para ello, cambiamos la opción por defecto: SequentialExecutor a LocalExecutor.

Por último, se ejecuta el comando airflow initdb para inicializar la base de datos. Este comando crea en la base de datos las tablas necesarias para el funcionamiento de Airflow.

4. Definición del DAG

Como se mencionó al comienzo, un DAG no es más que un scrip de python en el que se configura y se define todo lo relacionado con las tareas que hacen parte de un flujo de trabajo; por defecto, Airflow busca estos scripts en el directorio$AIRFLOW_HOME/dags .

A continuación, se define un DAG con tres sencillas tareas que se ejecutan una vez cada hora en orden secuencial (t1, t2 , t3). Para este ejemplo se requiere que las tareas puedan intercambiar información entre ellas, por lo que se utiliza la característica “XComs” de Airflow.

# simple_etl_dag.pyfrom datetime import timedelta
from datetime import datetime
import pandas as pd
# The DAG object
from airflow import DAG
# Operators
from airflow.operators.python_operator import PythonOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'Airflow',
'depends_on_past': True,
'start_date': datetime(2020, 7, 10, 13),
# 'end_date': datetime(2020, 7, 20),
'email': ['your_email@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'Extract_Transform_Load',
default_args=default_args,
description='Read and transform a csv file',
schedule_interval='30 * * * *',
)
def read_csv_file(file_name, *args, **kwargs):
data = pd.read_csv(file_name, sep=';',
encoding='iso-8859-1',
header='infer')
return data
def filter_columns(**context):
data = context['task_instance'].\
xcom_pull(task_ids='Read_csv_file')

data = data[['ESTACION', 'LONGITUD', 'LATITUD']]
return data
def load_data(output_path, **context):
data = context['task_instance'].\
xcom_pull(task_ids='Filter_columns')

data.to_csv(output_path, sep=',',
header=True,
encoding='utf-8',
index=False)
t1 = PythonOperator(
task_id='Read_csv_file',
provide_context=True,
python_callable=read_csv_file,
op_kwargs={'file_name': './data/ingest/informacion_estaciones_red_calidad_aire.csv'},
dag=dag
)
t2 = PythonOperator(
task_id='Filter_columns',
provide_context=True,
python_callable=filter_columns,
dag=dag
)
t3 = PythonOperator(
task_id='Save_data',
provide_context=True,
python_callable=load_data,
op_kwargs={'output_path': './data/out/data_for_map.csv'},
dag=dag
)
t1 >> t2 >> t3

Como se observa, al momento de instanciar el DAG es posible configurar, entre otras cosas, la fecha de inicio, fecha final (opcional) y el intervalo de ejecución (frecuencia). En esta ocasión vamos a definir como fecha de inicio las 13 horas de la fecha actual, no se define una fecha final y, en cuanto al intervalo, se configura una ejecución horaria (una vez cada hora) en el minuto 30 usando una expresión cron: 30 * * * *.

Seguido, debido a que se utiliza unPythonOperator , definimos las tres sencillas funciones que componen el flujo de trabajo. Cada tarea es una instancia del operador PythonOperator al que se pasan como parámetros: un identificador de tarea task_id , la función de python asociada a la tarea python_callable y el objeto dag ya creado. La tarea t1lee un conjunto de datos, tomado del Portal de Datos Abiertos del Ayuntamiento de Madrid, que se encuentra en la ruta ./data/ingest y retorna un Dataframe . La tarea t2toma el valor devuelto por la tarea t1y filtra la información quedándose únicamente con las columnas de interés. Por último, la tarea t3toma el resultado de la tarea t2y lo guarda como un fichero “.csv”, con un nombre nuevo en una ubicación diferente: ./data/out . Por último, se indica el orden de ejecución de estas tareas, en este caso secuencial, de la siguiente forma

t1 >> t2 >> t3

Airflow dispone de varios operadores adicionales útiles dependiendo de la necesidad y complejidad de las tareas a realizar, puedes encontrar mas información sobre los diferentes operadores directamente en la documentación oficial.

4. Ejecuta el DAG

Con el DAG completamente definido y ubicado en el directorio $AIRFLOW_HOME/dags, basta con ejecutar los siguientes comandos para iniciar el servidor web (interfaz gráfica) y el scheduler .

# Start the web server
airflow webserver -p 8080

# Start the scheduler
airflow scheduler

En el navegador, visita localhost:8080 y habilita el DAG con el nombre Extract_Transform_Load .

Puedes observar el DAG definido anteriormente y un conjunto de links a páginas útiles. Se puede ver exactamente cuántas tareas tuvieron éxito, fallaron o se están ejecutando actualmente.

Tree View

Representación en árbol del DAG a través del tiempo. Si una alguna de las tareas falla, se puede identificar rápidamente donde está el error.

Graph View

Permite ver las tareas del DAG y su estado actual para una ejecución específica.

Gantt View

Permite visualizar la duración de las tareas y la superposición. También, permite identificar dónde se dedica la mayor parte del tiempo durante ejecuciones específicas del DAG.

Los cambios que realices en los DAG no se actualizan automáticamente en Airflow. Usa el botón actualizar del navegador para visualizar los cambios recientes.

Conclusión

Puedes automatizar un proceso simple como el que se planteó en el inicio. Puedes definer varios DAG con diferentes tareas y niveles de complejidad utilizando la misma estructura de este proyecto. Ten en cuenta que con esta herramienta puedes realizar flujos de trabajo mucho mas completos así como integraciones con otras herramientas.

Es un buena practica dividir los flujos de trabajo complejos en tareas pequeñas, de esta forma tendrás un DAG mucho más limpio y fácil de mantener. El código completo de este proyecto puedes encontrarlo en mi repositorio.

Recuerda que este es solo un ejemplo básico y que puedes encontrar mayor información directamente en la documentación oficial.

--

--