Speed up your Airflow development with Docker and tests
If you develop in the cloud or on a shared server, you might have experienced the beauty of working with your teamate relying on the same codebase and the same database! Actually, when working on the same cloud environment with other people, it happens that certain resources (files, tables…) are shared between several teammates. This means you can sometimes find yourself blocked because other people are working on the same resource. And it can become frustrating when a file/table you needed is overwritten, moved or deleted…
TL;DR
If you want to do most of your Airflow’s work without relying on a shared workspace and avoiding latency, use an Airflow Docker container and run syntax and unit tests in it.

The dream
To have the cloud in your computer!
Or at least a local environment in which you could check that everything is ok with Python/Airflow syntax so the only thing you need to do on the shared environment is to test you code behavior.
When you have this local environment, you avoid a lot of handling to deploy your code or copy it to your environment, waiting 30s+ that Airflow understands you updated a file and refreshes it. With your local environment, you’ll see all errors faster than through the web interface or the Airflow’s logs (so you must switch to another application, like Stackdriver, to read this logs).
Achieve your dream
To run Airflow on your computer and in your CI, we are going to use… Docker, so it’s a bit of configuration at the beginning, but in the end you’ll be able to reproduce your Cloud configuration and share it with your teammates. You can save your Docker image in a private Docker registry to avoid rebuilding it. From my point of view, it’s easier to run an already configured Airflow with Docker than installing it on a virtual environment.
- Add Python packages to your Docker image
You can use an Airflow Docker image from Docker hub (puckel/docker-airflow:1.10.9
) or add Python packages to an existing image, by creating a Dockerfile:
FROM puckel/docker-airflow:1.10.9USER root
# requirements file with packages you need
COPY requirements.txt .
RUN pip install -r requirements.txt
USER airflow
If you customize your image, you need to save the Dockerfile and run this command in the Dockerfile directory (you might need to add sudo
before docker
):
$ docker build -t my-airflow-img:1.0 .
So you can see it in your local images :
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
my-airflow-img 1.0 96696eea2641 5 minutes ago 1.89GB
2. Allow Airflow’s variable loading
If you use Airflow variables in your code, you’ll need to configure a Fernet Key (cryptography key for Airflow) :
# replace <img> with my-airflow-img or puckel/docker-airflow:1.10.9$ docker run <img> python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)"
R9mdx6wCwIb0h_GChn1-Fbcth3H_gTBAjgvf87JLgSU=
And to pass this key through an environment variable :
# replace <fkey> (by the one you just produced) and <img>
# your Airflow's variables file is expected to be in dags directory (in your local/host machine) with name vars.jsondocker run -d -e FERNET_KEY=<fkey> -e DAGS_DIR='dags' \
--name airflow-web \
-v `pwd`/dags/:/usr/local/airflow/dags \
-p 8080:8080 <img>
Now, you can load your variables :
docker exec -it airflow-web airflow variables -i ./dags/vars.json
So now you have a working space to launch tests locally and save a lot of time. It’s not exactly a cloud: no scale up, no link with other applications ; but good enough to work and understand if your code will work. The remaining tests within your cloud environment and with your data will be quicker.
You can make your life much easier with a pinch of Bash, especially to run your tests in a CI/CD pipeline.
Fulfill your dream
You have Airflow running locally, you can see your dags loading in your browser localhost:8080
. If there are syntax errors, it will be displayed in the dags page and you can easily access the logs with docker logs -f airflow-web
.
Now that you have this environment, if you add a few more lines, you will be able to test the syntax of your code (it could be thought like a compilation step). We will run Python tests to check Python and Airflow syntax (ie: do tasks have task_id?) and you will know if it’s going to work within seconds.
First, we must create a file dags/test_syntax.py
and import Python and Airflow packages:
import os
import unittest # usefull to launch tests and imports
from importlib import util # usefull to import a filefrom airflow import models as af_models
We create a Python unittest class (DagsTest), with a test method (test_load_dags_integrity) that will try to load the dag files listed with dag_import
method:
class DagsTest(unittest.TestCase):
"""Validate dags syntax.""" def test_load_dags_integrity(self):
"""Import ml dag files and check for DAG syntax."""
root_dag_dir = os.path.join(DAG_DIR, 'load')
files = ['load_mesages.py','load_users.py'] self.dags_import(root_dag_dir, files)
You could easily change the files
list definition to fill it with a function (that takes a directory as input) defining which files to try to import in this directory based on a name convention, so you don’t have to update test files each time your team add a new dag!
The import_dags
function try to import all files defined in list_dag_files
, thanks to _load
, and checks if it’s able to find a dag within the loaded module with _check_if_there_is_a_dag_in
(function’s content was found in the first cicle of Data Testing Hell with Airflow):
def import_dags(self, dag_dir, list_dag_files):
"""
For each file in list_dag_files, we:
- try to load it to check syntax and
- check if there is a dag defined in the file.
"""
for filename in list_dag_files:
module, module_path = self._load(dag_dir, filename)
self._check_if_there_is_a_dag_in(module, module_path)@staticmethod
def _load(dag_dir, filename):
module_path = os.path.join(dag_dir, filename)
module_name, _ = os.path.splitext(filename) mod_spec = util.spec_from_file_location(
module_name, module_path)
module = util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
return module, module_path@staticmethod
def _check_if_there_is_a_dag_in(module, module_path):
"""Look if there is at least one dag defined in the module."""
assert any(
isinstance(var, af_models.DAG)
for var in vars(module).values())
You can now launch your syntax tests with:
docker exec -it airflow-web python -m unittest dags/test_syntax.py
Beyond your dream
We have integrity tests, we can easily add unit tests.
First, we must create a file dags/test_unit.py
and import some packages (no need of Airflow here, thanks to mock/patch!):
import unittest
from unittest.mock import MagicMock, patchfrom Training import Training
Next, we create a new unittest class for a Training class (in dags/Training.py
) with a setUp (triggered before each test) to define a false dag instance and to instanciate Training (so we won’t instanciate it in each test):
class TrainingUnitTest(unittest.TestCase):
"""
Unit test for training functions.
"""
def setUp(self):
self.dag = MagicMock(return_value="it would be a dag obj")
self.tr = Training(self.dag)
We add a new test in TrainingUnitTest, to check if our method launch
from Training
class behave as defined and that keep working in this way :
def test_launch(self):
with patch('airflow.operators.python_operator.PythonOperator') \
as mock_op:
self.tr.launch(usecase='topics_in_messages') mock_op.assert_called_with(
task_id='training_for_topics_in_messages',
python_callable=self.mi._train_model,
op_kwargs={
'usecase': 'topic_in_messages',
'number_of_clusters': 7},
dag=self.dag
)
We use a patch with patch('...')
to catch any call to PythonOperator
, so we don’t need Airflow to test it, it’s quicker and (more importantly) we do not want to test Airflow here, we just need to test our own code. We can test our code in intercation with Airflow, but it will be less accurate and slower (around 2 times slower by my measurements), we better want having specific integration tests for it (planned in a futur post).
You can now launch your unit tests with :
docker exec -it airflow-web python -m unittest dags/test_unit.py
Above the dream
(Yes… until the end with the dream….)
So you can work faster by checking if there are syntax errors in your code without dependancies to other people (if you share the same cloud) and you can manage your codebase in a safer way with unit tests.
Some interesting resources found on my way :
- Data’s inferno : 7 Circles of Data Testing Hell with Airflow (a must read)
- Airflow’s Best Practices on tests : it could be interessting to look at implementing the dagbag solution for dag loader test (instead of the one from Data’s inferno)
Thanks to Tiffany Souterre to read over !
Thank you for reading !