Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/providers/common/io/xcom/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _get_compression_suffix(compression: str) -> str:
raise ValueError(f"Compression {compression} is not supported. Make sure it is installed.")


class XComObjectStoreBackend(BaseXCom):
class XComObjectStorageBackend(BaseXCom):
"""XCom backend that stores data in an object store or database depending on the size of the data.

If the value is larger than the configured threshold, it will be stored in an object store.
Expand Down Expand Up @@ -155,7 +155,7 @@ def deserialize_value(
path = conf.get(SECTION, "xcom_objectstorage_path", fallback="")

try:
p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(path) / XComObjectStorageBackend._get_key(data)
return json.load(p.open(mode="rb", compression="infer"), cls=XComDecoder)
except TypeError:
return data
Expand All @@ -167,7 +167,7 @@ def purge(xcom: XCom, session: Session) -> None:
path = conf.get(SECTION, "xcom_objectstorage_path", fallback="")
if isinstance(xcom.value, str):
try:
p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(xcom.value)
p = ObjectStoragePath(path) / XComObjectStorageBackend._get_key(xcom.value)
p.unlink(missing_ok=True)
except TypeError:
pass
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-common-io/xcom_backend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Object Storage XCom Backend

The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs.

To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStorageBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required
to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be
put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be
Expand All @@ -30,7 +30,7 @@ compress the data before storing it in object storage.
So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip::

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/core-concepts/xcoms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Object Storage XCom Backend

The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs.

To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStorageBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection
id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required
to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be
put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be
Expand All @@ -74,7 +74,7 @@ compress the data before storing it in object storage.
So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip::

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
Expand Down
12 changes: 6 additions & 6 deletions tests/providers/common/io/xcom/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import BaseXCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.io.xcom.backend import XComObjectStoreBackend
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -85,15 +85,15 @@ def task_instance(task_instance_factory):
)


class TestXcomObjectStoreBackend:
class TestXComObjectStorageBackend:
path = "file:/tmp/xcom"

def setup_method(self):
try:
conf.add_section("common.io")
except DuplicateSectionError:
pass
conf.set("core", "xcom_backend", "airflow.providers.common.io.xcom.backend.XComObjectStoreBackend")
conf.set("core", "xcom_backend", "airflow.providers.common.io.xcom.backend.XComObjectStorageBackend")
conf.set("common.io", "xcom_objectstorage_path", self.path)
conf.set("common.io", "xcom_objectstorage_threshold", "50")
settings.configure_vars()
Expand Down Expand Up @@ -164,7 +164,7 @@ def test_value_storage(self, task_instance, session):
)

data = BaseXCom.deserialize_value(res)
p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data)
assert p.exists() is True

value = XCom.get_value(
Expand Down Expand Up @@ -210,7 +210,7 @@ def test_clear(self, task_instance, session):
)

data = BaseXCom.deserialize_value(res)
p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data)
assert p.exists() is True

XCom.clear(
Expand Down Expand Up @@ -250,7 +250,7 @@ def test_compression(self, task_instance, session):
)

data = BaseXCom.deserialize_value(res)
p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data)
p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data)
assert p.exists() is True
assert p.suffix == ".gz"

Expand Down