Echo Kanak

Airflow DAGs 101

Mar 1, 2025


Apache Airflow is one of the most popular tools for orchestrating data workflows. At the heart of Airflow lies the
DAG (Directed Acyclic Graph)

DAG

In Airflow, a DAG (Directed Acyclic Graph) is a Python-based definition of a workflow. It represents a collection of tasks with clearly defined relationships.

  • Directed: Each task has a defined order of execution (upstream/downstream).
  • Acyclic: Tasks cannot loop back to themselves — preventing infinite cycles.
  • Graph: Tasks are connected visually, making the pipeline easy to monitor.

A DAG can be as simple as one task or as complex as thousands of interconnected tasks.

A simple DAG with three tasks might look like this

Defining DAGs: Multiple Approaches

We can define our DAG with different ways

The most modern and clean approach uses the @dag decorator:

from airflow.sdk import dag
from pendulum import datetime

@dag(
    schedule='@daily',
    start_date=datetime(2025,02,12),
    description='This dag does amazing things',
    tags=['team_a', 'source_a'],
    max_consecutive_failed_dag_runs=3
)
def my_dag():
    @task
    def task_a():
        print("hello from task a")

    task_a()

my_dag()
python

2. Context Manager Approach

Using the with statement provides clear scoping for your DAG definition:

from airflow.sdk import dag, task, DAG
from pendulum import datetime

with DAG(
    schedule='@daily',
    start_date=datetime(2025,02,12),
    description='This dag does amazing things',
    tags=['team_a', 'source_a'],
    max_consecutive_failed_dag_runs=3
):
    @task
    def task_a():
        print("hello from task a")

    task_a()
python

3. Traditional Operators

For more complex tasks, we can use specific operators:

from airflow.sdk import dag
from airflow.providers.standard.operators.python import PythonOperator
from pendulum import datetime

def _task_a():
    print("hello from task a")

@dag(
    schedule='@daily',
    start_date=datetime(2025,02,12),
    description='This dag does amazing things',
    tags=['team_a', 'source_a'],
    max_consecutive_failed_dag_runs=3
)
def my_dag():
    task_a = PythonOperator(
        task_id="task_a",
        python_callable=_task_a
    )

my_dag()
python

Managing Task Dependencies

Dependencies define the order in which tasks execute.

Simple Linear Dependencies

For straightforward workflows, use the bitshift operator (>>):

@dag(
    schedule='@daily',
    start_date=datetime(2025,02,12),
    description='Linear workflow example',
    tags=['team_a', 'source_a'],
    max_consecutive_failed_dag_runs=3
)
def my_dag():
    @task
    def task_a():
        print("hello from task a")

    @task
    def task_b():
        print("hello from task b")

    @task
    def task_c():
        print("hello from task c")

    @task
    def task_d():
        print("hello from task d")

    task_a() >> task_b() >> task_c() >> task_d()

my_dag()
python

Parallel Task Execution

To have multiple tasks on the same level, use lists:

# This creates parallel execution for task_c and task_d
task_a() >> task_b() >> [task_c(), task_d()]
python

Complex Dependencies

For workflows like above if we use

task_a() >> [task_b(), task_c()]
task_a() >> [task_d(), task_e()]
python

we’ll get something like

each time we explicitly call a task it creates an instance of the task.

avoid creating duplicate task instances by using variables:

@dag(
    schedule='@daily',
    start_date=datetime(2025,02,12),
    description='Complex workflow example',
    tags=['team_a', 'source_a'],
    max_consecutive_failed_dag_runs=3
)
def my_dag():
    @task
    def task_a():
        print("hello from task a")

    @task
    def task_b():
        print("hello from task b")

    @task
    def task_c():
        print("hello from task c")

    @task
    def task_d():
        print("hello from task d")

    @task
    def task_e():
        print("hello from task e")

    # Store the task instance in a variable
    a = task_a()

    # Create branching dependencies
    a >> task_b() >> task_c()
    a >> task_d() >> task_e()

my_dag()
python

Using Chain for Complex Dependencies

the chain function provides a cleaner syntax for the same:

from airflow.models.baseoperator import chain

chain(task_a(), [task_b(), task_c()], [task_d(), task_e()])
python

Key Takeaways and Best Practices

  1. Unique Identifiers: Every DAG must have a unique identifier across your Airflow instance
  1. Start Date: While optional (defaults to None), setting a start date is crucial for scheduling
  1. Schedule Intervals: Define how frequently your DAG should run (@daily, @hourly, cron expressions, etc.)
  1. Documentation: Always include descriptions and tags to make your DAGs discoverable and maintainable
  1. Operator Selection: Before writing custom code, check the Astronomer Registry for existing operators
  1. Task Naming: Each task must have a unique identifier within its DAG
  1. Default Arguments: Use default_args dictionary to set common parameters across all tasks
  1. Dependency Patterns: Use bitshift operators (>>, <<) and lists for simple dependencies, and chain for complex patterns
  1. Avoid Task Duplication: When a task has multiple downstream dependencies, store it in a variable to prevent creating duplicate instances