From 803a556d6c5b3d2f47647a9c7693b33e893e511f Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sat, 10 Aug 2024 01:48:09 +0200 Subject: [PATCH 1/7] Fixing remaining Variable tests for db isolation mode, also fixing secret backend haven't called from EnvironmentVariablesBackend, Metastore and custom ones. This caused side effect to move the Variable.get() method to internal API --- .../endpoints/rpc_api_endpoint.py | 1 + airflow/models/variable.py | 37 +++++++++++++------ airflow/secrets/base_secrets.py | 11 +++++- tests/models/test_variable.py | 36 +++++++++--------- .../cncf/kubernetes/operators/test_job.py | 1 + 5 files changed, 57 insertions(+), 29 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index ad65157ef9415..c9720188b704d 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -129,6 +129,7 @@ def initialize_method_map() -> dict[str, Callable]: Variable.set, Variable.update, Variable.delete, + Variable.get, DAG.fetch_callback, DAG.fetch_dagrun, DagRun.fetch_task_instances, diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 4a9530e5d9523..e97b12520451f 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -122,12 +122,14 @@ def setdefault(cls, key, default, description=None, deserialize_json=False): else: return obj - @classmethod + @staticmethod + @provide_session + @internal_api_call def get( - cls, key: str, default_var: Any = __NO_DEFAULT_SENTINEL, deserialize_json: bool = False, + session: Session = None, ) -> Any: """ Get a value for an Airflow Variable Key. @@ -135,10 +137,11 @@ def get( :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exist :param deserialize_json: Deserialize the value to a Python dict + :param session: Session """ - var_val = Variable.get_variable_from_secrets(key=key) + var_val = Variable.get_variable_from_secrets(key=key, session=session) if var_val is None: - if default_var is not cls.__NO_DEFAULT_SENTINEL: + if Variable.check_if_no_default_sentinel_for_given_default_value(default_var): return default_var else: raise KeyError(f"Variable {key} does not exist") @@ -151,6 +154,10 @@ def get( mask_secret(var_val, key) return var_val + @classmethod + def check_if_no_default_sentinel_for_given_default_value(cls, default_var: Any) -> bool: + return default_var is cls.__NO_DEFAULT_SENTINEL + @staticmethod @provide_session @internal_api_call @@ -170,9 +177,10 @@ def set( :param value: Value to set for the Variable :param description: Description of the Variable :param serialize_json: Serialize the value to a JSON string + :param session: Session """ # check if the secret exists in the custom secrets' backend. - Variable.check_for_write_conflict(key) + Variable.check_for_write_conflict(key=key, session=session) if serialize_json: stored_value = json.dumps(value, indent=2) else: @@ -201,8 +209,9 @@ def update( :param key: Variable Key :param value: Value to set for the Variable :param serialize_json: Serialize the value to a JSON string + :param session: Session """ - Variable.check_for_write_conflict(key) + Variable.check_for_write_conflict(key=key, session=session) if Variable.get_variable_from_secrets(key=key) is None: raise KeyError(f"Variable {key} does not exist") @@ -210,7 +219,9 @@ def update( if obj is None: raise AttributeError(f"Variable {key} does not exist in the Database and cannot be updated.") - Variable.set(key, value, description=obj.description, serialize_json=serialize_json) + Variable.set( + key=key, value=value, description=obj.description, serialize_json=serialize_json, session=session + ) @staticmethod @provide_session @@ -232,7 +243,8 @@ def rotate_fernet_key(self): self._val = fernet.rotate(self._val.encode("utf-8")).decode() @staticmethod - def check_for_write_conflict(key: str) -> None: + @provide_session + def check_for_write_conflict(key: str, session: Session = None) -> None: """ Log a warning if a variable exists outside the metastore. @@ -241,11 +253,12 @@ def check_for_write_conflict(key: str) -> None: subsequent reads will not read the set value. :param key: Variable Key + :param session: Session """ for secrets_backend in ensure_secrets_loaded(): if not isinstance(secrets_backend, MetastoreBackend): try: - var_val = secrets_backend.get_variable(key=key) + var_val = secrets_backend.get_variable(key=key, session=session) if var_val is not None: _backend_name = type(secrets_backend).__name__ log.warning( @@ -267,11 +280,13 @@ def check_for_write_conflict(key: str) -> None: return None @staticmethod - def get_variable_from_secrets(key: str) -> str | None: + @provide_session + def get_variable_from_secrets(key: str, session: Session = None) -> str | None: """ Get Airflow Variable by iterating over all Secret Backends. :param key: Variable Key + :param session: Session :return: Variable Value """ # check cache first @@ -285,7 +300,7 @@ def get_variable_from_secrets(key: str) -> str | None: # iterate over backends if not in cache (or expired) for secrets_backend in ensure_secrets_loaded(): try: - var_val = secrets_backend.get_variable(key=key) + var_val = secrets_backend.get_variable(key=key, session=session) if var_val is not None: break except Exception: diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py index 3346d880f2eb5..fd149ee9c7d92 100644 --- a/airflow/secrets/base_secrets.py +++ b/airflow/secrets/base_secrets.py @@ -21,8 +21,11 @@ from typing import TYPE_CHECKING from airflow.exceptions import RemovedInAirflow3Warning +from airflow.utils.session import provide_session if TYPE_CHECKING: + from sqlalchemy.orm import Session + from airflow.models.connection import Connection @@ -134,11 +137,17 @@ def get_connections(self, conn_id: str) -> list[Connection]: return [conn] return [] - def get_variable(self, key: str) -> str | None: + @provide_session + def get_variable(self, key: str, session: Session = None) -> str | None: """ Return value for Airflow Variable. + The implementation of this method should take session + because the underlying implementation may run with db isolation. + i.e. MetastoreBackend.get_variable(). + :param key: Variable Key + :param session: Session :return: Variable Value """ raise NotImplementedError() diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index b3e327dab521b..ef673c2e67998 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -47,26 +47,26 @@ def setup_test_cases(self): db.clear_db_variables() crypto._fernet = None - @conf_vars({("core", "fernet_key"): ""}) + @conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): True}) def test_variable_no_encryption(self, session): """ Test variables without encryption """ Variable.set(key="key", value="value", session=session) - test_var = session.query(Variable).filter(Variable.key == "key").one() + test_var = Variable.get("key") assert not test_var.is_encrypted assert test_var.val == "value" # We always call mask_secret for variables, and let the SecretsMasker decide based on the name if it # should mask anything. That logic is tested in test_secrets_masker.py self.mask_secret.assert_called_once_with("value", "key") - @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()}) + @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode(), ("core", "unit_test_mode"): True}) def test_variable_with_encryption(self, session): """ Test variables with encryption """ Variable.set(key="key", value="value", session=session) - test_var = session.query(Variable).filter(Variable.key == "key").one() + test_var = Variable.get("key") assert test_var.is_encrypted assert test_var.val == "value" @@ -105,7 +105,7 @@ def test_variable_set_with_env_variable(self, caplog, session): Variable.set(key="key", value="db-value", session=session) with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"): # setting value while shadowed by an env variable will generate a warning - Variable.set("key", "new-db-value") + Variable.set(key="key", value="new-db-value", session=session) # value set above is not returned because the env variable value takes priority assert "env-value" == Variable.get("key") # invalidate the cache to re-evaluate value @@ -137,11 +137,11 @@ def test_variable_set_with_extra_secret_backend(self, mock_ensure_secrets, caplo "will be updated, but to read it you have to delete the conflicting variable from " "MockSecretsBackend" ) - Variable.delete("key") + Variable.delete(key="key", session=session) def test_variable_set_get_round_trip_json(self): value = {"a": 17, "b": 47} - Variable.set("tested_var_set_id", value, serialize_json=True) + Variable.set(key="tested_var_set_id", value=value, serialize_json=True) assert value == Variable.get("tested_var_set_id", deserialize_json=True) def test_variable_update(self, session): @@ -184,9 +184,9 @@ def test_get_non_existing_var_should_raise_key_error(self): with pytest.raises(KeyError): Variable.get("thisIdDoesNotExist") - def test_update_non_existing_var_should_raise_key_error(self): + def test_update_non_existing_var_should_raise_key_error(self, session): with pytest.raises(KeyError): - Variable.update("thisIdDoesNotExist", "value") + Variable.update(key="thisIdDoesNotExist", value="value", session=session) def test_get_non_existing_var_with_none_default_should_return_none(self): assert Variable.get("thisIdDoesNotExist", default_var=None) is None @@ -197,18 +197,21 @@ def test_get_non_existing_var_should_not_deserialize_json_default(self): "thisIdDoesNotExist", default_var=default_value, deserialize_json=True ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_setdefault_round_trip(self): key = "tested_var_setdefault_1_id" value = "Monday morning breakfast in Paris" Variable.setdefault(key, value) assert value == Variable.get(key) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_setdefault_round_trip_json(self): key = "tested_var_setdefault_2_id" value = {"city": "Paris", "Happiness": True} Variable.setdefault(key, value, deserialize_json=True) assert value == Variable.get(key, deserialize_json=True) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_setdefault_existing_json(self, session): key = "tested_var_setdefault_2_id" value = {"city": "Paris", "Happiness": True} @@ -218,21 +221,21 @@ def test_variable_setdefault_existing_json(self, session): assert value == val assert value == Variable.get(key, deserialize_json=True) - def test_variable_delete(self): + def test_variable_delete(self, session): key = "tested_var_delete" value = "to be deleted" # No-op if the variable doesn't exist - Variable.delete(key) + Variable.delete(key=key, session=session) with pytest.raises(KeyError): Variable.get(key) # Set the variable - Variable.set(key, value) + Variable.set(key=key, value=value, session=session) assert value == Variable.get(key) # Delete the variable - Variable.delete(key) + Variable.delete(key=key, session=session) with pytest.raises(KeyError): Variable.get(key) @@ -276,7 +279,7 @@ def test_caching_caches(self, mock_ensure_secrets: mock.Mock): mock_backend.get_variable.assert_called_once() # second call was not made because of cache assert first == second - def test_cache_invalidation_on_set(self): + def test_cache_invalidation_on_set(self, session): with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="from_env"): a = Variable.get("key") # value is saved in cache with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="from_env_two"): @@ -284,7 +287,7 @@ def test_cache_invalidation_on_set(self): assert a == b # setting a new value invalidates the cache - Variable.set("key", "new_value") + Variable.set(key="key", value="new_value", session=session) c = Variable.get("key") # cache should not be used @@ -310,8 +313,7 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m key=f"password-{os.getpid()}", val=variable_value, ) - session.add(var) - session.flush() + Variable.set(key=var.key, value=var.val, session=session) # Make sure we re-load it, not just get the cached object back session.expunge(var) diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py b/tests/providers/cncf/kubernetes/operators/test_job.py index ac888b706cc19..307a4ff8425b3 100644 --- a/tests/providers/cncf/kubernetes/operators/test_job.py +++ b/tests/providers/cncf/kubernetes/operators/test_job.py @@ -116,6 +116,7 @@ def test_templates(self, create_task_instance_of_operator, session): cmds="{{ dag.dag_id }}", image="{{ dag.dag_id }}", annotations={"dag-id": "{{ dag.dag_id }}"}, + session=session, ) session.add(ti) From 4f65b559206086ee923982064f59f15122225097 Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sat, 10 Aug 2024 01:59:24 +0200 Subject: [PATCH 2/7] Fixing remaining Variable tests for db isolation mode, also fixing secret backend haven't called from EnvironmentVariablesBackend, Metastore and custom ones --- .../api_internal/endpoints/rpc_api_endpoint.py | 1 + airflow/models/variable.py | 17 +++++++++++++---- tests/models/test_variable.py | 13 +++++-------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index c9720188b704d..c31beddbf548a 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -130,6 +130,7 @@ def initialize_method_map() -> dict[str, Callable]: Variable.update, Variable.delete, Variable.get, + Variable.setdefault, DAG.fetch_callback, DAG.fetch_dagrun, DagRun.fetch_task_instances, diff --git a/airflow/models/variable.py b/airflow/models/variable.py index e97b12520451f..0158f47720fce 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -97,8 +97,10 @@ def val(cls): """Get Airflow Variable from Metadata DB and decode it using the Fernet Key.""" return synonym("_val", descriptor=property(cls.get_val, cls.set_val)) - @classmethod - def setdefault(cls, key, default, description=None, deserialize_json=False): + @staticmethod + @provide_session + @internal_api_call + def setdefault(key, default, description=None, deserialize_json=False, session: Session = None): """ Return the current value for a key or store the default value and return it. @@ -110,12 +112,19 @@ def setdefault(cls, key, default, description=None, deserialize_json=False): :param description: Default value to set Description of the Variable :param deserialize_json: Store this as a JSON encoded value in the DB and un-encode it when retrieving a value + :param session: Session :return: Mixed """ - obj = Variable.get(key, default_var=None, deserialize_json=deserialize_json) + obj = Variable.get(key, default_var=None, deserialize_json=deserialize_json, session=session) if obj is None: if default is not None: - Variable.set(key, default, description=description, serialize_json=deserialize_json) + Variable.set( + key=key, + value=default, + description=description, + serialize_json=deserialize_json, + session=session, + ) return default else: raise ValueError("Default Value must be set") diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index ef673c2e67998..ae431c3c77a22 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -197,26 +197,23 @@ def test_get_non_existing_var_should_not_deserialize_json_default(self): "thisIdDoesNotExist", default_var=default_value, deserialize_json=True ) - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode - def test_variable_setdefault_round_trip(self): + def test_variable_setdefault_round_trip(self, session): key = "tested_var_setdefault_1_id" value = "Monday morning breakfast in Paris" - Variable.setdefault(key, value) + Variable.setdefault(key=key, default=value, session=session) assert value == Variable.get(key) - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode - def test_variable_setdefault_round_trip_json(self): + def test_variable_setdefault_round_trip_json(self, session): key = "tested_var_setdefault_2_id" value = {"city": "Paris", "Happiness": True} - Variable.setdefault(key, value, deserialize_json=True) + Variable.setdefault(key=key, default=value, deserialize_json=True, session=None) assert value == Variable.get(key, deserialize_json=True) - @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_setdefault_existing_json(self, session): key = "tested_var_setdefault_2_id" value = {"city": "Paris", "Happiness": True} Variable.set(key=key, value=value, serialize_json=True, session=session) - val = Variable.setdefault(key, value, deserialize_json=True) + val = Variable.setdefault(key=key, default=value, deserialize_json=True, session=session) # Check the returned value, and the stored value are handled correctly. assert value == val assert value == Variable.get(key, deserialize_json=True) From 43249ff9a4347e9d392bb693c34de59393751147 Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sat, 10 Aug 2024 13:16:22 +0200 Subject: [PATCH 3/7] Reverting adding Variable.get() in internal_api and removing abstract definition of session for secret_backend.get_variable() --- .../endpoints/rpc_api_endpoint.py | 1 - airflow/models/variable.py | 33 +++++++------------ airflow/secrets/base_secrets.py | 7 +--- tests/models/test_variable.py | 2 ++ 4 files changed, 14 insertions(+), 29 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index c31beddbf548a..df9dbff57f536 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -129,7 +129,6 @@ def initialize_method_map() -> dict[str, Callable]: Variable.set, Variable.update, Variable.delete, - Variable.get, Variable.setdefault, DAG.fetch_callback, DAG.fetch_dagrun, diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 0158f47720fce..c66f512acf138 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -115,7 +115,7 @@ def setdefault(key, default, description=None, deserialize_json=False, session: :param session: Session :return: Mixed """ - obj = Variable.get(key, default_var=None, deserialize_json=deserialize_json, session=session) + obj = Variable.get(key, default_var=None, deserialize_json=deserialize_json) if obj is None: if default is not None: Variable.set( @@ -131,14 +131,12 @@ def setdefault(key, default, description=None, deserialize_json=False, session: else: return obj - @staticmethod - @provide_session - @internal_api_call + @classmethod def get( + cls, key: str, default_var: Any = __NO_DEFAULT_SENTINEL, deserialize_json: bool = False, - session: Session = None, ) -> Any: """ Get a value for an Airflow Variable Key. @@ -146,11 +144,10 @@ def get( :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exist :param deserialize_json: Deserialize the value to a Python dict - :param session: Session """ - var_val = Variable.get_variable_from_secrets(key=key, session=session) + var_val = Variable.get_variable_from_secrets(key=key) if var_val is None: - if Variable.check_if_no_default_sentinel_for_given_default_value(default_var): + if default_var is not cls.__NO_DEFAULT_SENTINEL: return default_var else: raise KeyError(f"Variable {key} does not exist") @@ -163,10 +160,6 @@ def get( mask_secret(var_val, key) return var_val - @classmethod - def check_if_no_default_sentinel_for_given_default_value(cls, default_var: Any) -> bool: - return default_var is cls.__NO_DEFAULT_SENTINEL - @staticmethod @provide_session @internal_api_call @@ -189,7 +182,7 @@ def set( :param session: Session """ # check if the secret exists in the custom secrets' backend. - Variable.check_for_write_conflict(key=key, session=session) + Variable.check_for_write_conflict(key=key) if serialize_json: stored_value = json.dumps(value, indent=2) else: @@ -220,7 +213,7 @@ def update( :param serialize_json: Serialize the value to a JSON string :param session: Session """ - Variable.check_for_write_conflict(key=key, session=session) + Variable.check_for_write_conflict(key=key) if Variable.get_variable_from_secrets(key=key) is None: raise KeyError(f"Variable {key} does not exist") @@ -252,8 +245,7 @@ def rotate_fernet_key(self): self._val = fernet.rotate(self._val.encode("utf-8")).decode() @staticmethod - @provide_session - def check_for_write_conflict(key: str, session: Session = None) -> None: + def check_for_write_conflict(key: str) -> None: """ Log a warning if a variable exists outside the metastore. @@ -262,12 +254,11 @@ def check_for_write_conflict(key: str, session: Session = None) -> None: subsequent reads will not read the set value. :param key: Variable Key - :param session: Session """ for secrets_backend in ensure_secrets_loaded(): if not isinstance(secrets_backend, MetastoreBackend): try: - var_val = secrets_backend.get_variable(key=key, session=session) + var_val = secrets_backend.get_variable(key=key) if var_val is not None: _backend_name = type(secrets_backend).__name__ log.warning( @@ -289,13 +280,11 @@ def check_for_write_conflict(key: str, session: Session = None) -> None: return None @staticmethod - @provide_session - def get_variable_from_secrets(key: str, session: Session = None) -> str | None: + def get_variable_from_secrets(key: str) -> str | None: """ Get Airflow Variable by iterating over all Secret Backends. :param key: Variable Key - :param session: Session :return: Variable Value """ # check cache first @@ -309,7 +298,7 @@ def get_variable_from_secrets(key: str, session: Session = None) -> str | None: # iterate over backends if not in cache (or expired) for secrets_backend in ensure_secrets_loaded(): try: - var_val = secrets_backend.get_variable(key=key, session=session) + var_val = secrets_backend.get_variable(key=key) if var_val is not None: break except Exception: diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py index fd149ee9c7d92..13d0b5460256c 100644 --- a/airflow/secrets/base_secrets.py +++ b/airflow/secrets/base_secrets.py @@ -21,11 +21,8 @@ from typing import TYPE_CHECKING from airflow.exceptions import RemovedInAirflow3Warning -from airflow.utils.session import provide_session if TYPE_CHECKING: - from sqlalchemy.orm import Session - from airflow.models.connection import Connection @@ -137,8 +134,7 @@ def get_connections(self, conn_id: str) -> list[Connection]: return [conn] return [] - @provide_session - def get_variable(self, key: str, session: Session = None) -> str | None: + def get_variable(self, key: str) -> str | None: """ Return value for Airflow Variable. @@ -147,7 +143,6 @@ def get_variable(self, key: str, session: Session = None) -> str | None: i.e. MetastoreBackend.get_variable(). :param key: Variable Key - :param session: Session :return: Variable Value """ raise NotImplementedError() diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index ae431c3c77a22..ae17980224f13 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -100,6 +100,7 @@ def test_variable_set_get_round_trip(self): Variable.set("tested_var_set_id", "Monday morning breakfast") assert "Monday morning breakfast" == Variable.get("tested_var_set_id") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_set_with_env_variable(self, caplog, session): caplog.set_level(logging.WARNING, logger=variable.log.name) Variable.set(key="key", value="db-value", session=session) @@ -120,6 +121,7 @@ def test_variable_set_with_env_variable(self, caplog, session): "EnvironmentVariablesBackend" ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode @mock.patch("airflow.models.variable.ensure_secrets_loaded") def test_variable_set_with_extra_secret_backend(self, mock_ensure_secrets, caplog, session): caplog.set_level(logging.WARNING, logger=variable.log.name) From a703c80718a7eca46aa9fae08271074a9606bbd2 Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sat, 10 Aug 2024 13:17:30 +0200 Subject: [PATCH 4/7] Reverting adding Variable.get() in internal_api and removing abstract definition of session for secret_backend.get_variable() --- airflow/secrets/base_secrets.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py index 13d0b5460256c..3346d880f2eb5 100644 --- a/airflow/secrets/base_secrets.py +++ b/airflow/secrets/base_secrets.py @@ -138,10 +138,6 @@ def get_variable(self, key: str) -> str | None: """ Return value for Airflow Variable. - The implementation of this method should take session - because the underlying implementation may run with db isolation. - i.e. MetastoreBackend.get_variable(). - :param key: Variable Key :return: Variable Value """ From 3acd7f7d3d3b5649b59d7cafc5585041dd2a8223 Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sat, 10 Aug 2024 16:52:31 +0200 Subject: [PATCH 5/7] Reverting some tests back to query session and session.add() because cache invalidated in eac Variable.set() call --- airflow/models/variable.py | 4 ++-- tests/models/test_variable.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/airflow/models/variable.py b/airflow/models/variable.py index c66f512acf138..2908f5a5635dc 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -145,7 +145,7 @@ def get( :param default_var: Default value of the Variable if the Variable doesn't exist :param deserialize_json: Deserialize the value to a Python dict """ - var_val = Variable.get_variable_from_secrets(key=key) + var_val = Variable.get_variable_from_secrets(key) if var_val is None: if default_var is not cls.__NO_DEFAULT_SENTINEL: return default_var @@ -182,7 +182,7 @@ def set( :param session: Session """ # check if the secret exists in the custom secrets' backend. - Variable.check_for_write_conflict(key=key) + Variable.check_for_write_conflict(key) if serialize_json: stored_value = json.dumps(value, indent=2) else: diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index ae17980224f13..fd648f6937634 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -47,26 +47,26 @@ def setup_test_cases(self): db.clear_db_variables() crypto._fernet = None - @conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): True}) + @conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): "True"}) def test_variable_no_encryption(self, session): """ Test variables without encryption """ Variable.set(key="key", value="value", session=session) - test_var = Variable.get("key") + test_var = session.query(Variable).filter(Variable.key == "key").one() assert not test_var.is_encrypted assert test_var.val == "value" # We always call mask_secret for variables, and let the SecretsMasker decide based on the name if it # should mask anything. That logic is tested in test_secrets_masker.py self.mask_secret.assert_called_once_with("value", "key") - @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode(), ("core", "unit_test_mode"): True}) + @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()}) def test_variable_with_encryption(self, session): """ Test variables with encryption """ Variable.set(key="key", value="value", session=session) - test_var = Variable.get("key") + test_var = session.query(Variable).filter(Variable.key == "key").one() assert test_var.is_encrypted assert test_var.val == "value" @@ -208,7 +208,7 @@ def test_variable_setdefault_round_trip(self, session): def test_variable_setdefault_round_trip_json(self, session): key = "tested_var_setdefault_2_id" value = {"city": "Paris", "Happiness": True} - Variable.setdefault(key=key, default=value, deserialize_json=True, session=None) + Variable.setdefault(key=key, default=value, deserialize_json=True, session=session) assert value == Variable.get(key, deserialize_json=True) def test_variable_setdefault_existing_json(self, session): @@ -312,8 +312,8 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m key=f"password-{os.getpid()}", val=variable_value, ) - Variable.set(key=var.key, value=var.val, session=session) - + session.add(var) + session.flush() # Make sure we re-load it, not just get the cached object back session.expunge(var) _secrets_masker().patterns = set() From 283462fa827de36e9d44c58af632b1979bb8e91c Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sun, 11 Aug 2024 00:04:41 +0200 Subject: [PATCH 6/7] Adding parameter names again, local lowest deps tests working --- airflow/models/variable.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 2908f5a5635dc..c66f512acf138 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -145,7 +145,7 @@ def get( :param default_var: Default value of the Variable if the Variable doesn't exist :param deserialize_json: Deserialize the value to a Python dict """ - var_val = Variable.get_variable_from_secrets(key) + var_val = Variable.get_variable_from_secrets(key=key) if var_val is None: if default_var is not cls.__NO_DEFAULT_SENTINEL: return default_var @@ -182,7 +182,7 @@ def set( :param session: Session """ # check if the secret exists in the custom secrets' backend. - Variable.check_for_write_conflict(key) + Variable.check_for_write_conflict(key=key) if serialize_json: stored_value = json.dumps(value, indent=2) else: From 34a4aa06498dec685c2581f0b06c35130f0b767b Mon Sep 17 00:00:00 2001 From: bugraoz93 Date: Sun, 11 Aug 2024 16:35:24 +0200 Subject: [PATCH 7/7] Reverting the setDefault to internal_api because strangely failed random tests in the lowest-dependency --- airflow/api_internal/endpoints/rpc_api_endpoint.py | 1 - airflow/models/variable.py | 14 +++----------- tests/models/test_variable.py | 9 ++++++--- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index df9dbff57f536..ad65157ef9415 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -129,7 +129,6 @@ def initialize_method_map() -> dict[str, Callable]: Variable.set, Variable.update, Variable.delete, - Variable.setdefault, DAG.fetch_callback, DAG.fetch_dagrun, DagRun.fetch_task_instances, diff --git a/airflow/models/variable.py b/airflow/models/variable.py index c66f512acf138..63b71303bc803 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -97,10 +97,8 @@ def val(cls): """Get Airflow Variable from Metadata DB and decode it using the Fernet Key.""" return synonym("_val", descriptor=property(cls.get_val, cls.set_val)) - @staticmethod - @provide_session - @internal_api_call - def setdefault(key, default, description=None, deserialize_json=False, session: Session = None): + @classmethod + def setdefault(cls, key, default, description=None, deserialize_json=False): """ Return the current value for a key or store the default value and return it. @@ -118,13 +116,7 @@ def setdefault(key, default, description=None, deserialize_json=False, session: obj = Variable.get(key, default_var=None, deserialize_json=deserialize_json) if obj is None: if default is not None: - Variable.set( - key=key, - value=default, - description=description, - serialize_json=deserialize_json, - session=session, - ) + Variable.set(key=key, value=default, description=description, serialize_json=deserialize_json) return default else: raise ValueError("Default Value must be set") diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index fd648f6937634..3ec2691e5af95 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -199,23 +199,26 @@ def test_get_non_existing_var_should_not_deserialize_json_default(self): "thisIdDoesNotExist", default_var=default_value, deserialize_json=True ) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_setdefault_round_trip(self, session): key = "tested_var_setdefault_1_id" value = "Monday morning breakfast in Paris" - Variable.setdefault(key=key, default=value, session=session) + Variable.setdefault(key=key, default=value) assert value == Variable.get(key) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_setdefault_round_trip_json(self, session): key = "tested_var_setdefault_2_id" value = {"city": "Paris", "Happiness": True} - Variable.setdefault(key=key, default=value, deserialize_json=True, session=session) + Variable.setdefault(key=key, default=value, deserialize_json=True) assert value == Variable.get(key, deserialize_json=True) + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_variable_setdefault_existing_json(self, session): key = "tested_var_setdefault_2_id" value = {"city": "Paris", "Happiness": True} Variable.set(key=key, value=value, serialize_json=True, session=session) - val = Variable.setdefault(key=key, default=value, deserialize_json=True, session=session) + val = Variable.setdefault(key=key, default=value, deserialize_json=True) # Check the returned value, and the stored value are handled correctly. assert value == val assert value == Variable.get(key, deserialize_json=True)