Accelerate testing in Apache Airflow through DAG versioning

In this blog post we present a way to version your Airflow DAGs on a single server through isolated pipeline and data environments to enable more convenient simulation and testing.

photo of Hilmi Yildirim
Hilmi Yildirim

Senior Software Engineer

photo of Stefan Haase
Stefan Haase

Head of Engineering

Posted on Jun 10, 2022

Introduction

In the Performance Marketing department, we run paid advertisement campaigns for Zalando. To do so, we build services that allow us to manage campaigns, optimize and distribute content, and measure the performance of the campaigns at scale.

Talking about measurement, one of the core systems we’ve built and continuously extended over the years is our so-called marketing ROI (return on investment) pipeline. The ROI pipeline is a batch based data- and machine learning pipeline powered by Databricks Spark and orchestrated by Apache Airflow. It consists of various sub-pipelines (components), some of which are built using our python sdk zFlow. Examples for said components are our input data preparation, marketing attribution model or an incremental profit forecast for our campaigns. These components are owned and developed by different cross-functional teams (applied science, engineering, product) within Performance Marketing. You can read more about the way we measure campaign effectiveness from a functional perspective in our previous blog post.

Problem Statement

A recurring problem we faced during the development relates to the nature of the marketing ROI which lacks a ground truth1. It means that while we oftentimes have assumptions on what the impact of a change in input data or to our components has on the ROI, we require the new version of the ROI pipeline to be run end-to-end to confirm our assumptions. Since different teams are working on different components of the ROI pipeline in parallel, evaluating the impact of a change on the final ROI in isolation is required to work effectively (i.e. teams not blocking each other). The following section explains the problem in more depth.

As mentioned earlier, we are using Airflow to orchestrate the overall pipeline. The Airflow code is stored in a github repository. We have two servers, production and test. When a pull request is opened, the Airflow pipeline is deployed to the test server. On merge to the main branch, we deploy to the production server. In this setup, we have two so-called pipeline environments, a production (live) and a test environment. The live pipeline uses the live data environment while the test pipeline uses the test data environment. As our data layer, we’re mainly using AWS S3 with data organized as Spark tables. A set of Spark tables represents a data environment. Only one version of an Airflow DAG such as our marketing ROI pipeline can exist in each environment. When multiple features are developed at the same time, they have to share the test environment which oftentimes leads to conflicts since testing in isolation is not possible. Alternatively, the features can be tested sequentially which leads to delays. To solve the problem, we implemented a mechanism to enable a flexible number of Airflow environments. Moreover, we also developed a script to spin up new data environments.

Figure 1 depicts the relationship between a pipeline and data environments.

Environments

Figure 1: Environments

Pipeline Environment

A pipeline environment is a version of a pipeline (set of Airflow DAGs) deployed to an Airflow server on which it can run end-to-end. Each environment contains all DAGs necessary to produce the required output (e.g. marketing ROI in our case), so multiple environments can co-exist on one server and can be used independently.

Data Environment

A data environment is a set of Spark/Hive databases, tables and views. A pipeline environment uses a single data environment for reading and writing data.

Airflow Environments

Our main objective was to create a new Airflow environment once a pull request is opened on which the developed version of the pipeline can be tested in isolation. The most trivial way is to create a new Airflow server for every pull request, which would be time consuming and costly. For example, Amazon Managed Workflows for Apache Airflow (MWAA) needs up to 30 minutes to create a new Airflow server and you have to pay for additional resources. With our solution, a new environment is created on the existing test server once a pull request is opened, resulting in multiple environments on the same Airflow server. The creation of a new environment takes less than one minute.

Figure 2 shows how this could look like on the test server. We have 2 Airflow DAGs qu.test_dag and qu.test_dag_2 with three different environments: feature1, feature2 and feature3. "qu" is the name of an internal team at Zalando. The DAGs always have the team name as prefix. It means that the same DAGs are adapted and deployed through three separate pull requests.

Airflow Environments

Figure 2: Airflow Environments


When the corresponding pull request is closed, the environment will be deleted automatically. How did we implement this since the concept of environments does not exist in Airflow? To achieve this, we adjusted the source code of the Airflow library and developed a cron job which deletes the environments later on. The following sections explain necessary modifications made.

Deploying Airflow code as a zip file

The Airflow code is deployed as a single zip archive using the Packaging DAGs feature. This feature prevents dependency conflicts because every deployment only uses the dependencies which are defined in the same zip file. The zip file has the name of the branch from which we are deploying. For example, when we deploy the Airflow code from branch feature1, the zip file is called feature1.zip.

Use correct Jinja Paths

A problem occuring through the use of zip file is that jinja templates for files are not working anymore. Jinja detects the absolute path of the file correctly but the file cannot be read because it’s inside a zip file. For this reason we also deploy the unpackaged zip archive in a different location. Inside the dag.py file (see Figure 3 line 13 - 19) we add the location of the unpackaged files to the template search path. As a result, jinja now searches for templates inside the unpackaged folder.

Renaming Dag Ids

On one Airflow server, it’s not possible to create multiple DAGs with the same id. Therefore, we have to rename the DAG ids for every deployment. For that reason we adapted the dag.py file (see Figure 3) of the Airflow library which contains the DAG class. Inside the init method we are checking the file path of the python file which is initializing the dag. The path contains the name of the zip file, e.g. feature1.zip. This way we can differentiate the environments. We modify the original DAG id and inject the environment name (see Figure 3, lines 3-11). Furthermore, we add the environment name as a tag to enable filtering on environments.

class DAG():
…
    def __init__(...):
         # /usr/local/airflow/dags/feature1.zip/qu/main/file.py
         file_path = get_path_of_file_which_initialized_dag()

         #feature1
         feature_name = get_zip_file_name(file_path)

         dag_id = {team_name}.feature_name.{dag_id.split({team_name}.')[1]}
         tags.append(feature_name)

         # /usr/local/airflow/features/feature1/
         feature_dir_path = get_feature_dir_path(file_path)
         template_searchpath.add(feature_dir_path)

         # /usr/local/airflow/features/feature1/qu/main/
         feature_file_path = get_feature_file_dir_path(file_path)
         template_searchpath.add(feature_file_path)
…
Figure 3: Pseudo Code of adapted dag.py

Environment Cleanup

We have developed a cron job that checks the status of pull requests. Once a pull request is closed, the corresponding environment is deleted on the Airflow server. The job deletes the zip file and the folder which contains the unpackaged files. Then, it queries the Airflow metastore for all associated DAGs and deletes them via Airflow cli.

Data Environments

Every Airflow environment also requires a data environment, otherwise conflicts on the data layer could occur during parallel feature development. Our data is mainly organized as Spark databases stored on S3. A data environment is a set of Spark databases with a corresponding suffix, e.g. all databases of the live environment have the suffix _live. The ddls of our databases and tables are stored in a git repository. We developed a script which uses the ddls to create a new data environment (see Figure 4). The databases have the environment name as a suffix, e.g. db_attribution_feature1.

Data Environments

Figure 4: Create new Data Environment


A new data environment initially is empty, i.e. the databases do not contain any data. We could copy the data, this costs time and money though. A more elegant way is the table environment feature which we implemented with the data environment script. Instead of copying data, the script creates a view pointing to the respective test data (see Figure 5). Table environments are defined in a configuration file which is automatically created via the table environment script. The script uses information about input and output tables of all tasks which are predefined as yaml files. An example table environment configuration is db_attribution.m_events:TEST, resulting in the creation of the following example view.

CREATE VIEW db_attribution_feature1.m_events AS
SELECT * FROM db_attribution_test.m_events
Figure 5: Creating a view instead of copying data


A view is only created if the table is not used as output by one of the respective tasks. In some cases you need initial data for tables which are used as output. Therefore, the table environment script creates a configuration stub for these tables like that:

db_attribution.m_events:
    partitions:
        - date BETWEEN "x" AND "y"

If you define the partition ranges and execute the data environment script, it creates the table and copies the data for you.

Summary

In this blog post we presented how we enabled versioning of our performance marketing pipeline which is based on Apache Airflow. The Versioning is necessary to enable more convenient simulation and testing. We modified the Airflow DAGs class and used the Packaging DAGs feature of Apache Airflow to make it possible to have multiple versions of the same DAGs on a single server. This allows us to deploy a git branch consisting of Airflow DAGs directly to a single Airflow server where they can run isolated from other versions. The deployment takes less than 1 minute compared to up to 30 minutes when you create a new Airflow server for the deployment. To enable isolation on data level we implemented a script which spins up a new Data Environment consisting of Spark/Hive tables on S3. As a result, every Pipeline version can use a dedicated Data Environment.


Are you interested to join us to work on such problems and many more business related challenges? Apply as a Senior Data/ML Engineer at Zalando’s performance marketing department.


  1. This is simplified, ultimately we consider the results of our a/b tests as ground truth. Yet, a/b tests are only run in certain periods of the year and are used to correct our marketing attribution results also in-between a/b test periods. Here, due to internal and external factors such as spend changes or campaign efficiency changes, the ground truth could in fact have changed as well. 



Related posts