When building pipelines in Apache Airflow, tasks often need to share data with each other.
That’s where XComs (cross-communication)
What are XComs?
XComs enable tasks in your Airflow DAGs to exchange small amounts of data. :
- Push: A task stores data in XCom using a unique identifier
- Pull: Another task retrieves that data using the same identifier
- Storage: XCom data is stored in Airflow's metadata database by default
- Identification: Each XCom is uniquely identified by multiple fields: key, run ID, task ID, and DAG ID
Every task instance in Airflow gets its own context dictionary that contains metadata about the current execution.
context["ti"]
→ refers to the TaskInstance object of the currently running task
XCom Implementation Patterns
Method 1: Explicit Context Usage
The most verbose way using the full context dictionary:
from airflow.sdk import dag, task, Context
@dag
def xcom_dag():
@task
def task_a(**context: Context):
val = 27
context["ti"].xcom_push(key="my_key", value=val)
@task
def task_b(**context: Context):
val = context["ti"].xcom_pull(task_ids="task_a", key="my_key")
print(f"Received value: {val}")
task_a() >> task_b()
xcom_dag()
Method 2: Direct TaskInstance Access
Accessing the TaskInstance directly:
from airflow.sdk import dag, task
@dag
def xcom_dag():
@task
def task_a(ti):
val = 27
ti.xcom_push(key="my_key", value=val)
@task
def task_b(ti):
val = ti.xcom_pull(task_ids="task_a", key="my_key")
print(f"Received value: {val}")
task_a() >> task_b()
xcom_dag()
Method 3: Implicit XCom with return values (Recommended)
Pythonic approach uses return values and function parameters:
from airflow.sdk import dag, task
@dag
def xcom_dag():
@task
def task_a():
val = 27
return val # Automatically pushed to XCom
@task
def task_b(val): # Automatically pulled from XCom
print(f"Received value: {val}")
val = task_a()
task_b(val)
xcom_dag()
- Less boilerplate code
- More readable and intuitive
- Follows Python conventions
- Automatic XCom handling behind the scenes
Advanced XCom Patterns
Pulling from Multiple Tasks
When we need data from several upstream tasks, to ensure proper dependencies and we use task ID lists:
@dag
def xcom_dag():
@task
def task_a(ti):
val = 27
ti.xcom_push(key="my_key", value=val)
@task
def task_c(ti):
val = 28
ti.xcom_push(key="my_key", value=val)
@task
def task_b(ti):
# Pull from multiple tasks
vals = ti.xcom_pull(task_ids=["task_a", "task_c"], key="my_key")
print(f"Received values: {vals}") # Returns a list
task_a() >> task_c() >> task_b()
xcom_dag()
Pushing Multiple Values
Use dictionaries to organize and share multiple related values:
@dag
def xcom_dag():
@task
def task_a(ti):
vals = {
"val1": 27,
"val2": 8,
"val3": 1997,
"status": "success"
}
ti.xcom_push(key="my_key", value=vals)
@task
def task_b(ti):
vals = ti.xcom_pull(task_ids="task_a", key="my_key")
print(f"val1: {vals['val1']}")
print(f"val2: {vals['val2']}")
print(f"Status: {vals['status']}")
task_a() >> task_b()
xcom_dag()
XCom Limitations and Best Practices
- Keep XComs small - they're for metadata, not bulk data
use for : file paths, URLs, run IDs, execution metadata, row counts, processing stats, small config dicts, status flags, control signals, DB connection strings.
avoid for : raw CSVs, large JSON dumps, entire DataFrames, full datasets, binary files, images, large API responses.
- Size Constraints: XCom storage limits vary by database:
- SQLite: Up to 2GB
- Postgres: Up to 1GB
- MySQL: Up to 64MB
- Use external storage for large datasets and pass references via XCom
- It is unsuitable for sharing large amounts of data, so one should trigger a Spark job or similar.
- JSON Serialization: Data must be JSON serializable (strings, numbers, lists, dictionaries, booleans, null)
a dag example:
@dag
def large_data_dag():
@task
def extract_data():
# Process large dataset
df = process_large_dataset()
# Store data externally
s3_path = "s3://my-bucket/processed-data/run-123.parquet"
df.to_parquet(s3_path)
# Return only the reference
return {
"data_path": s3_path,
"row_count": len(df),
"processing_time": "45 seconds"
}
@task
def transform_data(metadata):
# Load data using the reference
df = pd.read_parquet(metadata["data_path"])
# Process and store result
transformed_df = transform(df)
new_path = "s3://my-bucket/transformed-data/run-123.parquet"
transformed_df.to_parquet(new_path)
return {"transformed_path": new_path}
metadata = extract_data()
transform_data(metadata)
large_data_dag()