From 662957399216f588cf7812aadccc370f33ec8f6c Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 29 Mar 2024 10:14:40 +0800 Subject: [PATCH] Rename to XComObjectStorageBackend The config names all use "object storage", but the class is still using "object store". Since we're not renaming the configs, let's rename the class for consistency. --- airflow/providers/common/io/xcom/backend.py | 6 +++--- .../xcom_backend.rst | 4 ++-- docs/apache-airflow/core-concepts/xcoms.rst | 4 ++-- tests/providers/common/io/xcom/test_backend.py | 12 ++++++------ 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/airflow/providers/common/io/xcom/backend.py b/airflow/providers/common/io/xcom/backend.py index 061a90ae031ac..b2416862ee34c 100644 --- a/airflow/providers/common/io/xcom/backend.py +++ b/airflow/providers/common/io/xcom/backend.py @@ -65,7 +65,7 @@ def _get_compression_suffix(compression: str) -> str: raise ValueError(f"Compression {compression} is not supported. Make sure it is installed.") -class XComObjectStoreBackend(BaseXCom): +class XComObjectStorageBackend(BaseXCom): """XCom backend that stores data in an object store or database depending on the size of the data. If the value is larger than the configured threshold, it will be stored in an object store. @@ -155,7 +155,7 @@ def deserialize_value( path = conf.get(SECTION, "xcom_objectstorage_path", fallback="") try: - p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(data) + p = ObjectStoragePath(path) / XComObjectStorageBackend._get_key(data) return json.load(p.open(mode="rb", compression="infer"), cls=XComDecoder) except TypeError: return data @@ -167,7 +167,7 @@ def purge(xcom: XCom, session: Session) -> None: path = conf.get(SECTION, "xcom_objectstorage_path", fallback="") if isinstance(xcom.value, str): try: - p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(xcom.value) + p = ObjectStoragePath(path) / XComObjectStorageBackend._get_key(xcom.value) p.unlink(missing_ok=True) except TypeError: pass diff --git a/docs/apache-airflow-providers-common-io/xcom_backend.rst b/docs/apache-airflow-providers-common-io/xcom_backend.rst index 9216fff711859..43bb7bb579228 100644 --- a/docs/apache-airflow-providers-common-io/xcom_backend.rst +++ b/docs/apache-airflow-providers-common-io/xcom_backend.rst @@ -20,7 +20,7 @@ Object Storage XCom Backend The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs. -To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection +To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStorageBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be @@ -30,7 +30,7 @@ compress the data before storing it in object storage. So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip:: [core] - xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend + xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend [common.io] xcom_objectstorage_path = s3://conn_id@mybucket/key diff --git a/docs/apache-airflow/core-concepts/xcoms.rst b/docs/apache-airflow/core-concepts/xcoms.rst index cd72044f7ae84..75884513c8f49 100644 --- a/docs/apache-airflow/core-concepts/xcoms.rst +++ b/docs/apache-airflow/core-concepts/xcoms.rst @@ -64,7 +64,7 @@ Object Storage XCom Backend The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs. -To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection +To enable storing XComs in an object store, you can set the ``xcom_backend`` configuration option to ``airflow.providers.common.io.xcom.backend.XComObjectStorageBackend``. You will also need to set ``xcom_objectstorage_path`` to the desired location. The connection id is obtained from the user part of the url the you will provide, e.g. ``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore, ``xcom_objectstorage_threshold`` is required to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be @@ -74,7 +74,7 @@ compress the data before storing it in object storage. So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip:: [core] - xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStoreBackend + xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend [common.io] xcom_objectstorage_path = s3://conn_id@mybucket/key diff --git a/tests/providers/common/io/xcom/test_backend.py b/tests/providers/common/io/xcom/test_backend.py index d85133b1942cf..da2a11950ee06 100644 --- a/tests/providers/common/io/xcom/test_backend.py +++ b/tests/providers/common/io/xcom/test_backend.py @@ -30,7 +30,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.models.xcom import BaseXCom, resolve_xcom_backend from airflow.operators.empty import EmptyOperator -from airflow.providers.common.io.xcom.backend import XComObjectStoreBackend +from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.types import DagRunType @@ -85,7 +85,7 @@ def task_instance(task_instance_factory): ) -class TestXcomObjectStoreBackend: +class TestXComObjectStorageBackend: path = "file:/tmp/xcom" def setup_method(self): @@ -93,7 +93,7 @@ def setup_method(self): conf.add_section("common.io") except DuplicateSectionError: pass - conf.set("core", "xcom_backend", "airflow.providers.common.io.xcom.backend.XComObjectStoreBackend") + conf.set("core", "xcom_backend", "airflow.providers.common.io.xcom.backend.XComObjectStorageBackend") conf.set("common.io", "xcom_objectstorage_path", self.path) conf.set("common.io", "xcom_objectstorage_threshold", "50") settings.configure_vars() @@ -164,7 +164,7 @@ def test_value_storage(self, task_instance, session): ) data = BaseXCom.deserialize_value(res) - p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data) + p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data) assert p.exists() is True value = XCom.get_value( @@ -210,7 +210,7 @@ def test_clear(self, task_instance, session): ) data = BaseXCom.deserialize_value(res) - p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data) + p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data) assert p.exists() is True XCom.clear( @@ -250,7 +250,7 @@ def test_compression(self, task_instance, session): ) data = BaseXCom.deserialize_value(res) - p = ObjectStoragePath(self.path) / XComObjectStoreBackend._get_key(data) + p = ObjectStoragePath(self.path) / XComObjectStorageBackend._get_key(data) assert p.exists() is True assert p.suffix == ".gz"