Speed up your Airflow development with Docker and tests

If you develop in the cloud or on a shared server, you might have experienced the beauty of working with your teamate relying on the same codebase and the same database! Actually, when working on the same cloud environment with other people, it happens that certain resources (files, tables…) are shared between several teammates. This means you can sometimes find yourself blocked because other people are working on the same resource. And it can become frustrating when a file/table you needed is overwritten, moved or deleted…

TL;DR

Nizhny Novgorod - Photo by c7dys

The dream

To have the cloud in your computer!

Achieve your dream

To run Airflow on your computer and in your CI, we are going to use… Docker, so it’s a bit of configuration at the beginning, but in the end you’ll be able to reproduce your Cloud configuration and share it with your teammates. You can save your Docker image in a private Docker registry to avoid rebuilding it. From my point of view, it’s easier to run an already configured Airflow with Docker than installing it on a virtual environment.

FROM puckel/docker-airflow:1.10.9USER root
# requirements file with packages you need
COPY requirements.txt .
RUN pip install -r requirements.txt

USER airflow
$ docker build -t my-airflow-img:1.0 .
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
my-airflow-img 1.0 96696eea2641 5 minutes ago 1.89GB
# replace <img> with my-airflow-img or puckel/docker-airflow:1.10.9$ docker run <img> python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)"
R9mdx6wCwIb0h_GChn1-Fbcth3H_gTBAjgvf87JLgSU=
# replace <fkey> (by the one you just produced) and <img>
# your Airflow's variables file is expected to be in dags directory (in your local/host machine) with name vars.json
docker run -d -e FERNET_KEY=<fkey> -e DAGS_DIR='dags' \
--name airflow-web \
-v `pwd`/dags/:/usr/local/airflow/dags \
-p 8080:8080 <img>
docker exec -it airflow-web airflow variables -i ./dags/vars.json

Fulfill your dream

You have Airflow running locally, you can see your dags loading in your browser localhost:8080. If there are syntax errors, it will be displayed in the dags page and you can easily access the logs with docker logs -f airflow-web.

import os
import unittest # usefull to launch tests and imports
from importlib import util # usefull to import a file
from airflow import models as af_models
class DagsTest(unittest.TestCase):
"""Validate dags syntax."""
def test_load_dags_integrity(self):
"""Import ml dag files and check for DAG syntax."""
root_dag_dir = os.path.join(DAG_DIR, 'load')
files = ['load_mesages.py','load_users.py']
self.dags_import(root_dag_dir, files)
def import_dags(self, dag_dir, list_dag_files):
"""
For each file in list_dag_files, we:
- try to load it to check syntax and
- check if there is a dag defined in the file.
"""
for filename in list_dag_files:
module, module_path = self._load(dag_dir, filename)
self._check_if_there_is_a_dag_in(module, module_path)
@staticmethod
def _load(dag_dir, filename):
module_path = os.path.join(dag_dir, filename)
module_name, _ = os.path.splitext(filename)
mod_spec = util.spec_from_file_location(
module_name, module_path)
module = util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
return module, module_path
@staticmethod
def _check_if_there_is_a_dag_in(module, module_path):
"""Look if there is at least one dag defined in the module."""
assert any(
isinstance(var, af_models.DAG)
for var in vars(module).values())
docker exec -it airflow-web python -m unittest dags/test_syntax.py

Beyond your dream

We have integrity tests, we can easily add unit tests.

import unittest
from unittest.mock import MagicMock, patch
from Training import Training
class TrainingUnitTest(unittest.TestCase):
"""
Unit test for training functions.
"""
def setUp(self):
self.dag = MagicMock(return_value="it would be a dag obj")
self.tr = Training(self.dag)
def test_launch(self):
with patch('airflow.operators.python_operator.PythonOperator') \
as mock_op:

self.tr.launch(usecase='topics_in_messages')
mock_op.assert_called_with(
task_id='training_for_topics_in_messages',
python_callable=self.mi._train_model,
op_kwargs={
'usecase': 'topic_in_messages',
'number_of_clusters': 7},
dag=self.dag
)
docker exec -it airflow-web python -m unittest dags/test_unit.py

Above the dream

(Yes… until the end with the dream….)

  • Airflow’s Best Practices on tests : it could be interessting to look at implementing the dagbag solution for dag loader test (instead of the one from Data’s inferno)

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store