Skip to content

Airflow dynamic DAGs

Learn how to create Apache Airflow dynamic DAGs (with and without TaskFlow API).

Christophe Blefari
Christophe Blefari
4 min read
Airflow dynamic DAGs

Table of Contents

Airflow is a wonderful tool I have been using for the last 4 years. Sometimes I like it. Sometimes I don't. This post is dedicated to Airflow dynamic DAGs. I want to show you how to do it properly. In this case we can see Airflow as a Python framework, so writing dynamic DAG is just writing another Python code.

Why should I use dynamic DAGs?

Dynamic DAGs are useful when you have multiple tables for instance and you want a DAG per table ingested. In order to avoid create multiple Python files and doing copy-paste you can factorize your code and create a dynamic structure.

If we illustrates further with an example. Let's imagine you have to copy your production Postgres database. In order to do it you create a list of the tables you want to get every morning. The factory will take this table list as an input and will dynamically produce a list of DAGs.

If for instance you want to have do different stuff depending on the table type — incremental/full e.g. — you can go deeper by creating configuration files per table and then looping over all the configuration files to create a DAG per table.

When you're doing an extract and load process I recommend you to create a DAG per table rather than having a DAG per schema or database for instance. This way you'll have smaller scope in each DAG and backfilling table will be easier. The main disadvantage of this solution is that you have to use more sensors in downstream dependencies.

In summary you can use dynamic DAGs for:

  • Ingestion multiple tables from a database → a DAG per table
  • Running a list of SQL queries per domain → a DAG per domain
  • Scraping a list of website → a DAG per website
  • Every-time you are copy pasting DAG code

Dynamic DAGs with TaskFlow API

We will use last Airflow version — 2.3.4 — here, but it'll work for every version with the TaskFlow API. Let's say we have 3 sources and we want to create a DAG per source to do stuff on each source. These sources are user, product and order. For each source we want to apply a prepare and a load function.

import pendulum

from airflow.decorators import dag, task


@task
def prepare(source):
    print(f"Prepare {source}")
    pass


@task
def load(source):
    print(f"Load {source}")
    pass


def create_dag(source):
    @dag(
        schedule_interval="0 1 * * *",
        start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
        catchup=False,
        dag_id=f"prepare_and_load_{source}"
    )
    def template():
        """
        ### Prepare and load data
        This is the DAG that loads all the raw data
        """
        prepare_task = prepare(source)
        load_task = load(source)

        prepare_task >> load_task

    return template()


for source in ["user", "product", "order"]:
    globals()[source] = create_dag(source)
dags/prepare_and_load.py

The important part of this code is the last line. It creates a global variable that contains the DAG object that the Airflow DagBag will parse and add for every scheduler loop.

globals()[source] = create_dag(source)

If you want to go further you can also create a configuration per source. I recommend you to create Python configuration rather than JSON. The main reason is because Python configuration can be linted, can be statically checked and you can comment Python dicts.

Airflow UI with dynamic DAGs

Dynamic DAGs with configurations

So you have a configuration folder called config in which you have the 3 sources configuration.

config = {
    "name": "user",
    "type": "A",
}
config/user.py (as an example)
import os
from dataclasses import dataclass

import pendulum
from importlib.machinery import SourceFileLoader

from airflow.decorators import dag, task

CONFIG_FOLDER = "dags/config"


@dataclass
class Config:
    name: str
    type: str


@task
def prepare(source):
    print(f"Prepare {source}")
    pass


@task
def load(source):
    print(f"Load {source}")
    pass


def create_dag(source):
    @dag(
        schedule_interval="0 1 * * *",
        start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
        catchup=False,
        dag_id=f"prepare_and_load_{source.name}"
    )
    def template():
        """
        ### Load monthly data to the warehouse
        This is the DAG that loads all the raw data to the warehouse
        """
        prepare_task = prepare(source)
        load_task = load(source)

        prepare_task >> load_task

    return template()


for file in os.listdir(CONFIG_FOLDER):
    if file.endswith(".py"):
        filename = os.path.join(CONFIG_FOLDER, file)
        module = SourceFileLoader("module", filename).load_module()
        config = Config(**module.config)
        globals()[config.name] = create_dag(config)

dags/prepare_and_load_advanced.py

I decided to use a dataclass to parse every configurations and a module loader to load the configuration. This way every configuration will be statically checked and if an error has been added in your configuration Python code will be invalid. You can then catch it in you CI/CD process for instance.

Dynamic DAGs without TaskFlow

You can also do it without TaskFlow API, for that you just need to also have a create_dag function that returns a DAG and you're set. Below a small example.

import os
from dataclasses import dataclass

import pendulum
from importlib.machinery import SourceFileLoader

from airflow import DAG
from airflow.operators.python import PythonOperator

CONFIG_FOLDER = "dags/config"


@dataclass
class Config:
    name: str
    type: str


def create_dag(source):
    dag = DAG(
        dag_id=f"prepare_and_load_{source.name}",
        start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
        catchup=False,
        schedule_interval="0 1 * * *",
    )

    prepare_task = PythonOperator(
        task_id="prepare",
        python_callable=lambda x: print(x),
        dag=dag
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=lambda x: print(x),
        dag=dag
    )

    prepare_task >> load_task

    return dag


for file in os.listdir(CONFIG_FOLDER):
    if file.endswith(".py"):
        filename = os.path.join(CONFIG_FOLDER, file)
        module = SourceFileLoader("module", filename).load_module()
        config = Config(**module.config)
        globals()[config.name] = create_dag(config)

dags/main_with_taskflow.py

Conclusion

Creating dynamic in Airflow is super easy. You can create DAG factories for all repetitive tasks you may have, thanks to this you'll be able to unit test your ETL code.

Airflow

Data Explorer

The hub to explore Data News links

Search and bookmark more than 1200 links

Explore

Christophe Blefari

Senior Data Engineer. I like 🚲, 🪴 and 🎮. I can do everything with data, just ask.

Comments


Related Posts

Members Public

How to deploy an OSM tile server

How to deploy an OpenStreetMap compatible tile server by yourself with a custom style.

Members Public

Introduction to Airflow concepts

Getting started with Airflow, understand what are the basic concepts of Airflow and where to get started.