Echo Kanak

Airflow XComs 101

Mar 31, 2025


When building pipelines in Apache Airflow, tasks often need to share data with each other.
That’s where
XComs (cross-communication)

What are XComs?

XComs enable tasks in your Airflow DAGs to exchange small amounts of data. :

  1. Push: A task stores data in XCom using a unique identifier
  1. Pull: Another task retrieves that data using the same identifier
  1. Storage: XCom data is stored in Airflow's metadata database by default
  1. Identification: Each XCom is uniquely identified by multiple fields: key, run ID, task ID, and DAG ID

Every task instance in Airflow gets its own context dictionary that contains metadata about the current execution.

  • context["ti"] → refers to the TaskInstance object of the currently running task

XCom Implementation Patterns

Method 1: Explicit Context Usage

The most verbose way using the full context dictionary:

from airflow.sdk import dag, task, Context

@dag
def xcom_dag():
    @task
    def task_a(**context: Context):
        val = 27
        context["ti"].xcom_push(key="my_key", value=val)

    @task
    def task_b(**context: Context):
        val = context["ti"].xcom_pull(task_ids="task_a", key="my_key")
        print(f"Received value: {val}")

    task_a() >> task_b()

xcom_dag()
python

Method 2: Direct TaskInstance Access

Accessing the TaskInstance directly:

from airflow.sdk import dag, task

@dag
def xcom_dag():
    @task
    def task_a(ti):
        val = 27
        ti.xcom_push(key="my_key", value=val)

    @task
    def task_b(ti):
        val = ti.xcom_pull(task_ids="task_a", key="my_key")
        print(f"Received value: {val}")

    task_a() >> task_b()

xcom_dag()
python

Pythonic approach uses return values and function parameters:

from airflow.sdk import dag, task

@dag
def xcom_dag():
    @task
    def task_a():
        val = 27
        return val  # Automatically pushed to XCom

    @task
    def task_b(val):  # Automatically pulled from XCom
        print(f"Received value: {val}")

    val = task_a()
    task_b(val)

xcom_dag()
python
  • Less boilerplate code
  • More readable and intuitive
  • Follows Python conventions
  • Automatic XCom handling behind the scenes

Advanced XCom Patterns

Pulling from Multiple Tasks

When we need data from several upstream tasks, to ensure proper dependencies and we use task ID lists:

@dag
def xcom_dag():
    @task
    def task_a(ti):
        val = 27
        ti.xcom_push(key="my_key", value=val)

    @task
    def task_c(ti):
        val = 28
        ti.xcom_push(key="my_key", value=val)

    @task
    def task_b(ti):
        # Pull from multiple tasks
        vals = ti.xcom_pull(task_ids=["task_a", "task_c"], key="my_key")
        print(f"Received values: {vals}")  # Returns a list

    task_a() >> task_c() >> task_b()

xcom_dag()
python

Pushing Multiple Values

Use dictionaries to organize and share multiple related values:

@dag
def xcom_dag():
    @task
    def task_a(ti):
        vals = {
            "val1": 27,
            "val2": 8,
            "val3": 1997,
            "status": "success"
        }
        ti.xcom_push(key="my_key", value=vals)

    @task
    def task_b(ti):
        vals = ti.xcom_pull(task_ids="task_a", key="my_key")
        print(f"val1: {vals['val1']}")
        print(f"val2: {vals['val2']}")
        print(f"Status: {vals['status']}")

    task_a() >> task_b()

xcom_dag()
python

XCom Limitations and Best Practices

  1. Keep XComs small - they're for metadata, not bulk data

    use for : file paths, URLs, run IDs, execution metadata, row counts, processing stats, small config dicts, status flags, control signals, DB connection strings.

    avoid for : raw CSVs, large JSON dumps, entire DataFrames, full datasets, binary files, images, large API responses.

  1. Size Constraints: XCom storage limits vary by database:
    • SQLite: Up to 2GB
    • Postgres: Up to 1GB
    • MySQL: Up to 64MB
  1. Use external storage for large datasets and pass references via XCom
  1. It is unsuitable for sharing large amounts of data, so one should trigger a Spark job or similar.
  1. JSON Serialization: Data must be JSON serializable (strings, numbers, lists, dictionaries, booleans, null)

a dag example:

@dag
def large_data_dag():
    @task
    def extract_data():
        # Process large dataset
        df = process_large_dataset()

        # Store data externally
        s3_path = "s3://my-bucket/processed-data/run-123.parquet"
        df.to_parquet(s3_path)

        # Return only the reference
        return {
            "data_path": s3_path,
            "row_count": len(df),
            "processing_time": "45 seconds"
        }

    @task
    def transform_data(metadata):
        # Load data using the reference
        df = pd.read_parquet(metadata["data_path"])

        # Process and store result
        transformed_df = transform(df)
        new_path = "s3://my-bucket/transformed-data/run-123.parquet"
        transformed_df.to_parquet(new_path)

        return {"transformed_path": new_path}

    metadata = extract_data()
    transform_data(metadata)

large_data_dag()
python