Table of Contents
Airflow is a wonderful tool I have been using for the last 4 years. Sometimes I like it. Sometimes I don't. This post is dedicated to Airflow dynamic DAGs. I want to show you how to do it properly. In this case we can see Airflow as a Python framework, so writing dynamic DAG is just writing another Python code.
Why should I use dynamic DAGs?
Airflow Dynamic DAGs are useful when you have multiple tables for instance and you want a DAG per table ingested. In order to avoid create multiple Python files and doing copy-paste you can factorize your code and create a dynamic structure.
If we illustrates further with an example. Let's imagine you have to copy your production Postgres database. In order to do it you create a list of the tables you want to get every morning. The
factory will take this table list as an input and will dynamically produce a list of DAGs.
If for instance you want to have do different stuff depending on the table type — incremental/full e.g. — you can go deeper by creating configuration files per table and then looping over all the configuration files to create a DAG per table.
When you're doing an extract and load process I recommend you to create a DAG per table rather than having a DAG per schema or database for instance. This way you'll have smaller scope in each DAG and backfilling table will be easier. The main disadvantage of this solution is that you have to use more sensors in downstream dependencies.
In summary you can use dynamic DAGs for:
- Ingestion multiple tables from a database → a DAG per table
- Running a list of SQL queries per domain → a DAG per domain
- Scraping a list of website → a DAG per website
- Every-time you are copy pasting DAG code
Dynamic DAGs with TaskFlow API
We will use last Airflow version — 2.3.4 — here, but it'll work for every version with the TaskFlow API. Let's say we have 3 sources and we want to create a DAG per source to do stuff on each source. These sources are
order. For each source we want to apply a prepare and a load function.
The important part of this code is the last line. It creates a global variable that contains the DAG object that the Airflow DagBag will parse and add for every scheduler loop.
globals()[source] = create_dag(source)
If you want to go further you can also create a configuration per source. I recommend you to create Python configuration rather than JSON. The main reason is because Python configuration can be linted, can be statically checked and you can comment Python dicts.
Dynamic DAGs with configurations
So you have a configuration folder called
config in which you have the 3 sources configuration.
I decided to use a dataclass to parse every configurations and a module loader to load the configuration. This way every configuration will be statically checked and if an error has been added in your configuration Python code will be invalid. You can then catch it in you CI/CD process for instance.
Dynamic DAGs without TaskFlow
You can also do it without TaskFlow API, for that you just need to also have a
create_dag function that returns a DAG and you're set. Below a small example.
Creating dynamic in Airflow is super easy. You can create DAG factories for all repetitive tasks you may have, thanks to this you'll be able to unit test your ETL code.
Join the newsletter to receive the latest updates in your inbox.