Apache Airflow orchestrates complex workflows by executing tasks as isolated, independent units. While this isolation ensures reliability, tasks often need to share state or data. Airflow solves this with XComs (cross-communications).
Excessive XCom writes create high I/O concurrency, leading to database locks and slower scheduler loops. Designing "Exclusive" XCom Workflows
from typing import Any from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook import pandas as pd import uuid class S3XComBackend(BaseXCom): PREFIX = "s3://my-airflow-xcom-bucket/data/" @staticmethod def serialize(value: Any, **kwargs) -> str: if isinstance(value, pd.DataFrame): key = f"S3XComBackend.PREFIXuuid.uuid4().parquet" hook = S3Hook(aws_conn_id='aws_default') # Convert DataFrame to bytes file_buffer = value.to_parquet(index=False) # Upload to S3 hook.load_bytes(file_buffer, key=key, replace=True) # Return the reference path to be stored in the DB return BaseXCom.serialize(key) # Fallback to default serialization for small native data types return BaseXCom.serialize(value) @staticmethod def deserialize(result) -> Any: deserialized_value = BaseXCom.deserialize(result) # Check if the stored value is an S3 pointer if isinstance(deserialized_value, str) and deserialized_value.startswith(S3XComBackend.PREFIX): hook = S3Hook(aws_conn_id='aws_default') file_bytes = hook.read_key(deserialized_value) # Reconstruct the DataFrame return pd.read_parquet(file_bytes) return deserialized_value Use code with caution. Activating the Custom Backend
This public link is valid for 7 days and shares a thread, including any personal information you added. This link or copies made by others cannot be deleted. If you share with third parties, their policies apply. Can’t copy the link right now. Try again later.
: To share metadata or small result sets (like a filename or a record count) between tasks in a
Apache Airflow orchestrates complex workflows by executing tasks as isolated, independent units. While this isolation ensures reliability, tasks often need to share state or data. Airflow solves this with XComs (cross-communications).
Excessive XCom writes create high I/O concurrency, leading to database locks and slower scheduler loops. Designing "Exclusive" XCom Workflows
from typing import Any from airflow.models.xcom import BaseXCom from airflow.providers.amazon.aws.hooks.s3 import S3Hook import pandas as pd import uuid class S3XComBackend(BaseXCom): PREFIX = "s3://my-airflow-xcom-bucket/data/" @staticmethod def serialize(value: Any, **kwargs) -> str: if isinstance(value, pd.DataFrame): key = f"S3XComBackend.PREFIXuuid.uuid4().parquet" hook = S3Hook(aws_conn_id='aws_default') # Convert DataFrame to bytes file_buffer = value.to_parquet(index=False) # Upload to S3 hook.load_bytes(file_buffer, key=key, replace=True) # Return the reference path to be stored in the DB return BaseXCom.serialize(key) # Fallback to default serialization for small native data types return BaseXCom.serialize(value) @staticmethod def deserialize(result) -> Any: deserialized_value = BaseXCom.deserialize(result) # Check if the stored value is an S3 pointer if isinstance(deserialized_value, str) and deserialized_value.startswith(S3XComBackend.PREFIX): hook = S3Hook(aws_conn_id='aws_default') file_bytes = hook.read_key(deserialized_value) # Reconstruct the DataFrame return pd.read_parquet(file_bytes) return deserialized_value Use code with caution. Activating the Custom Backend
This public link is valid for 7 days and shares a thread, including any personal information you added. This link or copies made by others cannot be deleted. If you share with third parties, their policies apply. Can’t copy the link right now. Try again later.
: To share metadata or small result sets (like a filename or a record count) between tasks in a