From 3357f29a2abba230dd77166831c8c1d4909f3c77 Mon Sep 17 00:00:00 2001 From: Connor Kirkpatrick Date: Fri, 21 Nov 2025 15:42:19 +0000 Subject: [PATCH 1/5] Support replays in Idempotency utility Durable Functions introduces the concept of function replays. Previously, the idempotency utility would throw an "IdempotencyItemAlreadyExistsError", as the replay has the same payload of the initial execution. It appears as a duplicate, so is rejected. Now, a replay is allowed --- .../utilities/idempotency/base.py | 17 +++++++++--- .../utilities/idempotency/idempotency.py | 26 ++++++++++++++----- .../utilities/typing/__init__.py | 4 +-- .../utilities/typing/lambda_context.py | 13 ++++++++++ 4 files changed, 47 insertions(+), 13 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index e1a7d78da40..0a705616e06 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -123,10 +123,18 @@ def __init__( self.persistence_store = persistence_store - def handle(self) -> Any: + def handle(self, durable_mode: str | None = None) -> Any: """ Main entry point for handling idempotent execution of a function. + Parameters + ---------- + durable_mode : str | None, optional + Mode for handling in-progress executions. Options: + - "REPLAY_MODE": Allow replay of functions that are already in progress + - "EXECUTION_MODE": Standard durable execution mode + - None: Standard idempotency behavior (raises IdempotencyAlreadyInProgressError) + Returns ------- Any @@ -138,12 +146,12 @@ def handle(self) -> Any: # In most cases we can retry successfully on this exception. for i in range(MAX_RETRIES + 1): # pragma: no cover try: - return self._process_idempotency() + return self._process_idempotency(durable_mode) except IdempotencyInconsistentStateError: if i == MAX_RETRIES: raise # Bubble up when exceeded max tries - def _process_idempotency(self): + def _process_idempotency(self, durable_mode: str | None): try: # We call save_inprogress first as an optimization for the most common case where no idempotent record # already exists. If it succeeds, there's no need to call get_record. @@ -159,7 +167,8 @@ def _process_idempotency(self): # We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective # way of retrieving the existing record after a failed conditional write operation. record = exc.old_data_record or self._get_idempotency_record() - + if durable_mode == "REPLAY_MODE": + return self._get_function_response() # If a record is found, handle it for status if record: return self._handle_for_status(record) diff --git a/aws_lambda_powertools/utilities/idempotency/idempotency.py b/aws_lambda_powertools/utilities/idempotency/idempotency.py index f59d7df7179..02c3c9ca4d0 100644 --- a/aws_lambda_powertools/utilities/idempotency/idempotency.py +++ b/aws_lambda_powertools/utilities/idempotency/idempotency.py @@ -28,7 +28,7 @@ from aws_lambda_powertools.utilities.idempotency.persistence.base import ( BasePersistenceLayer, ) - from aws_lambda_powertools.utilities.typing import LambdaContext + from aws_lambda_powertools.utilities.typing import DurableContext, LambdaContext from aws_lambda_powertools.warnings import PowertoolsUserWarning @@ -37,9 +37,9 @@ @lambda_handler_decorator def idempotent( - handler: Callable[[Any, LambdaContext], Any], + handler: Callable[[Any, LambdaContext | DurableContext], Any], event: dict[str, Any], - context: LambdaContext, + context: LambdaContext | DurableContext, persistence_store: BasePersistenceLayer, config: IdempotencyConfig | None = None, key_prefix: str | None = None, @@ -55,7 +55,7 @@ def idempotent( event: dict Lambda's Event context: dict - Lambda's Context + Lambda's Context or Durable Context persistence_store: BasePersistenceLayer Instance of BasePersistenceLayer to store data config: IdempotencyConfig @@ -91,7 +91,15 @@ def handler(event, context): return handler(event, context, **kwargs) config = config or IdempotencyConfig() - config.register_lambda_context(context) + + if hasattr(context, "state"): + # Extract lambda_context from DurableContext for idempotency tracking + config.register_lambda_context(context.lambda_context) + durable_mode = "REPLAY_MODE" if len(context.state.operations) > 1 else "EXECUTION_MODE" + else: + # Standard LambdaContext + config.register_lambda_context(context) + durable_mode = None args = event, context idempotency_handler = IdempotencyHandler( @@ -104,7 +112,7 @@ def handler(event, context): function_kwargs=kwargs, ) - return idempotency_handler.handle() + return idempotency_handler.handle(durable_mode=durable_mode) def idempotent_function( @@ -193,6 +201,10 @@ def decorate(*args, **kwargs): f" Ensure this exists in your function's signature as well as the caller used it as a keyword argument", ) + durable_mode = None + if len(args) >= 2 and hasattr(args[1], "state"): + durable_mode = "REPLAY_MODE" if len(args[1].state.operations) > 1 else "EXECUTION_MODE" + payload = kwargs.get(data_keyword_argument) idempotency_handler = IdempotencyHandler( @@ -206,6 +218,6 @@ def decorate(*args, **kwargs): function_kwargs=kwargs, ) - return idempotency_handler.handle() + return idempotency_handler.handle(durable_mode=durable_mode) return cast(AnyCallableT, decorate) diff --git a/aws_lambda_powertools/utilities/typing/__init__.py b/aws_lambda_powertools/utilities/typing/__init__.py index 22f907025fc..a28feb2c53c 100644 --- a/aws_lambda_powertools/utilities/typing/__init__.py +++ b/aws_lambda_powertools/utilities/typing/__init__.py @@ -4,6 +4,6 @@ [`Typing`](../utilities/typing.md) """ -from .lambda_context import LambdaContext +from .lambda_context import DurableContext, LambdaContext -__all__ = ["LambdaContext"] +__all__ = ["DurableContext", "LambdaContext"] diff --git a/aws_lambda_powertools/utilities/typing/lambda_context.py b/aws_lambda_powertools/utilities/typing/lambda_context.py index 49fb7044792..e501535720c 100644 --- a/aws_lambda_powertools/utilities/typing/lambda_context.py +++ b/aws_lambda_powertools/utilities/typing/lambda_context.py @@ -93,3 +93,16 @@ def tenant_id(self) -> str | None: def get_remaining_time_in_millis() -> int: """Returns the number of milliseconds left before the execution times out.""" return 0 + + +class DurableContext: + _lambda_context: LambdaContext + _state: object + + @property + def lambda_context(self) -> LambdaContext: + return self._lambda_context + + @property + def state(self) -> object: + return self._state From 71591fde2047ed6fde32203a1f9b028b6cbb7042 Mon Sep 17 00:00:00 2001 From: Connor Kirkpatrick Date: Fri, 21 Nov 2025 15:58:41 +0000 Subject: [PATCH 2/5] Fix logic in idempotent function decorator --- aws_lambda_powertools/utilities/idempotency/idempotency.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/idempotency.py b/aws_lambda_powertools/utilities/idempotency/idempotency.py index 02c3c9ca4d0..c612a785b15 100644 --- a/aws_lambda_powertools/utilities/idempotency/idempotency.py +++ b/aws_lambda_powertools/utilities/idempotency/idempotency.py @@ -201,10 +201,6 @@ def decorate(*args, **kwargs): f" Ensure this exists in your function's signature as well as the caller used it as a keyword argument", ) - durable_mode = None - if len(args) >= 2 and hasattr(args[1], "state"): - durable_mode = "REPLAY_MODE" if len(args[1].state.operations) > 1 else "EXECUTION_MODE" - payload = kwargs.get(data_keyword_argument) idempotency_handler = IdempotencyHandler( @@ -218,6 +214,6 @@ def decorate(*args, **kwargs): function_kwargs=kwargs, ) - return idempotency_handler.handle(durable_mode=durable_mode) + return idempotency_handler.handle() return cast(AnyCallableT, decorate) From 209ab8567a50e06bcdd2c318fff77cdfc39746a9 Mon Sep 17 00:00:00 2001 From: Connor Kirkpatrick Date: Sun, 23 Nov 2025 13:35:19 +0000 Subject: [PATCH 3/5] Update to record `is_replay` --- .../utilities/idempotency/base.py | 18 +++++++++--------- .../utilities/idempotency/idempotency.py | 6 +++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index 0a705616e06..eedc3e785f4 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -123,17 +123,17 @@ def __init__( self.persistence_store = persistence_store - def handle(self, durable_mode: str | None = None) -> Any: + def handle(self, is_replay: bool = False) -> Any: """ Main entry point for handling idempotent execution of a function. Parameters ---------- - durable_mode : str | None, optional - Mode for handling in-progress executions. Options: - - "REPLAY_MODE": Allow replay of functions that are already in progress - - "EXECUTION_MODE": Standard durable execution mode - - None: Standard idempotency behavior (raises IdempotencyAlreadyInProgressError) + is_replay : bool, optional + Whether this is a replay of a function that is already in progress. + If True, allows replay of functions that are already in progress. + If False, uses standard idempotency behavior (raises IdempotencyAlreadyInProgressError). + Defaults to False. Returns ------- @@ -146,12 +146,12 @@ def handle(self, durable_mode: str | None = None) -> Any: # In most cases we can retry successfully on this exception. for i in range(MAX_RETRIES + 1): # pragma: no cover try: - return self._process_idempotency(durable_mode) + return self._process_idempotency(is_replay) except IdempotencyInconsistentStateError: if i == MAX_RETRIES: raise # Bubble up when exceeded max tries - def _process_idempotency(self, durable_mode: str | None): + def _process_idempotency(self, is_replay: bool): try: # We call save_inprogress first as an optimization for the most common case where no idempotent record # already exists. If it succeeds, there's no need to call get_record. @@ -167,7 +167,7 @@ def _process_idempotency(self, durable_mode: str | None): # We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective # way of retrieving the existing record after a failed conditional write operation. record = exc.old_data_record or self._get_idempotency_record() - if durable_mode == "REPLAY_MODE": + if is_replay: return self._get_function_response() # If a record is found, handle it for status if record: diff --git a/aws_lambda_powertools/utilities/idempotency/idempotency.py b/aws_lambda_powertools/utilities/idempotency/idempotency.py index c612a785b15..7a4e15ffef4 100644 --- a/aws_lambda_powertools/utilities/idempotency/idempotency.py +++ b/aws_lambda_powertools/utilities/idempotency/idempotency.py @@ -95,11 +95,11 @@ def handler(event, context): if hasattr(context, "state"): # Extract lambda_context from DurableContext for idempotency tracking config.register_lambda_context(context.lambda_context) - durable_mode = "REPLAY_MODE" if len(context.state.operations) > 1 else "EXECUTION_MODE" + is_replay = len(context.state.operations) > 1 else: # Standard LambdaContext config.register_lambda_context(context) - durable_mode = None + is_replay = False args = event, context idempotency_handler = IdempotencyHandler( @@ -112,7 +112,7 @@ def handler(event, context): function_kwargs=kwargs, ) - return idempotency_handler.handle(durable_mode=durable_mode) + return idempotency_handler.handle(is_replay=is_replay) def idempotent_function( From 46a161ea7050bc99875b9776b372baede48e243a Mon Sep 17 00:00:00 2001 From: Connor Kirkpatrick Date: Sun, 23 Nov 2025 13:55:39 +0000 Subject: [PATCH 4/5] Fix typing --- .../utilities/idempotency/idempotency.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/idempotency.py b/aws_lambda_powertools/utilities/idempotency/idempotency.py index 7a4e15ffef4..6e696a2e579 100644 --- a/aws_lambda_powertools/utilities/idempotency/idempotency.py +++ b/aws_lambda_powertools/utilities/idempotency/idempotency.py @@ -93,9 +93,11 @@ def handler(event, context): config = config or IdempotencyConfig() if hasattr(context, "state"): - # Extract lambda_context from DurableContext for idempotency tracking - config.register_lambda_context(context.lambda_context) - is_replay = len(context.state.operations) > 1 + # Extract lambda_context from DurableContext + durable_context = cast("DurableContext", context) + config.register_lambda_context(durable_context.lambda_context) + # Note: state.operations is accessed via duck typing at runtime + is_replay = len(durable_context.state.operations) > 1 # type: ignore[attr-defined] else: # Standard LambdaContext config.register_lambda_context(context) From a6fc663297abdf79ec3892d786d2b3bee1b29ba4 Mon Sep 17 00:00:00 2001 From: Connor Kirkpatrick Date: Sun, 23 Nov 2025 15:08:11 +0000 Subject: [PATCH 5/5] Add tests --- .../idempotency/_boto3/test_idempotency.py | 187 ++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/tests/functional/idempotency/_boto3/test_idempotency.py b/tests/functional/idempotency/_boto3/test_idempotency.py index 17f14c2c182..202c0e5a9a5 100644 --- a/tests/functional/idempotency/_boto3/test_idempotency.py +++ b/tests/functional/idempotency/_boto3/test_idempotency.py @@ -46,6 +46,7 @@ from aws_lambda_powertools.utilities.idempotency.serialization.dataclass import ( DataclassSerializer, ) +from aws_lambda_powertools.utilities.typing import DurableContext from aws_lambda_powertools.utilities.validation import envelopes, validator from aws_lambda_powertools.warnings import PowertoolsUserWarning from tests.functional.idempotency.utils import ( @@ -2136,3 +2137,189 @@ def lambda_handler(record, context): result = lambda_handler(mock_event, lambda_context) # THEN we expect the function to execute successfully assert result == expected_result + + +# Tests: Durable Functions Integration + + +@pytest.fixture +def durable_context_single_operation(lambda_context): + """DurableContext with single operation (execution mode, is_replay=False)""" + durable_ctx = DurableContext() + durable_ctx._lambda_context = lambda_context + durable_ctx._state = Mock(operations=[{"id": "op1"}]) + return durable_ctx + + +@pytest.fixture +def durable_context_multiple_operations(lambda_context): + """DurableContext with multiple operations (replay mode, is_replay=True)""" + durable_ctx = DurableContext() + durable_ctx._lambda_context = lambda_context + durable_ctx._state = Mock(operations=[{"id": "op1"}, {"id": "op2"}]) + return durable_ctx + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_with_durable_context_first_execution( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_single_operation, + lambda_response, +): + """ + Test idempotent decorator with DurableContext during first execution (execution mode). + + When a durable function executes for the first time (single operation in state), + is_replay=False, and the function should execute normally, saving the result. + """ + # GIVEN + stubber = stub.Stubber(persistence_store.client) + stubber.add_response("put_item", {}) + stubber.add_response("update_item", {}) + stubber.activate() + + # WHEN + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + result = lambda_handler(lambda_apigw_event, durable_context_single_operation) + + # THEN + assert result == lambda_response + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_with_durable_context_during_replay( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_multiple_operations, + timestamp_future, + lambda_response, + serialized_lambda_response, +): + """ + Test idempotent decorator with DurableContext during workflow replay (replay mode). + + When a durable function replays (multiple operations in state), is_replay=True. + The function should execute once to get the response and save it, even when + an INPROGRESS record exists from a previous execution. + """ + # GIVEN + hashed_key = hash_idempotency_key(data=lambda_apigw_event) + + stubber = stub.Stubber(persistence_store.client) + ddb_response = { + "Item": { + "id": {"S": hashed_key}, + "expiration": {"N": timestamp_future}, + "data": {"S": serialized_lambda_response}, + "status": {"S": "INPROGRESS"}, + }, + } + stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response) + # In replay mode, function still executes once to get response, then saves it + stubber.add_response("update_item", {}) + stubber.activate() + + # WHEN + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + result = lambda_handler(lambda_apigw_event, durable_context_multiple_operations) + + # THEN - Should return result in replay mode + assert result == lambda_response + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_extracts_lambda_context_from_durable_context( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_single_operation, + lambda_response, +): + """ + Test that idempotency properly extracts LambdaContext from DurableContext. + + The @idempotent decorator should extract the wrapped lambda_context from + DurableContext for tracking remaining time and other Lambda-specific features. + """ + # GIVEN + stubber = stub.Stubber(persistence_store.client) + stubber.add_response("put_item", {}) + stubber.add_response("update_item", {}) + stubber.activate() + + # WHEN + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + # Verify we can access lambda_context properties + assert hasattr(context, "lambda_context") + assert context.lambda_context.function_name == "test-func" + return lambda_response + + result = lambda_handler(lambda_apigw_event, durable_context_single_operation) + + # THEN + assert result == lambda_response + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}], indirect=True) +def test_idempotent_lambda_concurrent_durable_executions_raise_in_progress_error( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + durable_context_single_operation, + lambda_response, +): + """ + Test that concurrent durable executions are prevented by IdempotencyAlreadyInProgressError. + + Scenario: Two different durable function executions attempt to process the same + idempotent operation concurrently: + 1. First execution creates an INPROGRESS record + 2. Second execution (in execution mode, is_replay=False) finds the INPROGRESS record + 3. Second execution should raise IdempotencyAlreadyInProgressError to prevent duplicate work + + This ensures data consistency when multiple durable function instances execute concurrently. + """ + # GIVEN + hashed_key = hash_idempotency_key(data=lambda_apigw_event) + + stubber = stub.Stubber(persistence_store.client) + # Simulate existing INPROGRESS record with far future timestamps + ddb_response = { + "Item": { + "id": {"S": hashed_key}, + "expiration": {"N": "9999999999"}, + "in_progress_expiration": {"N": "9999999999999"}, # Far future in milliseconds + "status": {"S": "INPROGRESS"}, + }, + } + stubber.add_client_error("put_item", "ConditionalCheckFailedException", modeled_fields=ddb_response) + stubber.activate() + + # WHEN / THEN - Should raise IdempotencyAlreadyInProgressError in execution mode + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + with pytest.raises(IdempotencyAlreadyInProgressError) as exc_info: + lambda_handler(lambda_apigw_event, durable_context_single_operation) + + # Verify error message contains the idempotency key + assert hashed_key in str(exc_info.value) + stubber.assert_no_pending_responses() + stubber.deactivate()