Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0bb7651
nrl-2002 Initial set up of dynamo export
jackleary Mar 5, 2026
704b380
nrl-2002 Sort lambdas for deployment
jackleary Mar 5, 2026
28522c6
Merge branch 'develop' into feature/jale13-nrl-2002-dynamo-export
jackleary Mar 5, 2026
c115f48
NRL-2002 Update table name
jackleary Mar 5, 2026
b14cade
Merge branch 'develop' into feature/jale13-nrl-2002-dynamo-export
mattdean3-nhs Apr 20, 2026
4ef535a
NRL-2002 Remove references to patient flags
anjalitrace2-nhs Apr 29, 2026
6cdfa8b
NRL-2002 Remove unused variables
anjalitrace2-nhs Apr 30, 2026
9f50f2e
NRL-2002 Add new dynamo export lambdas to build
anjalitrace2-nhs Apr 30, 2026
09c2166
Merge branch 'develop' of github.com:NHSDigital/NRLF into NRL-2002-ge…
anjalitrace2-nhs Apr 30, 2026
6566ba1
NRL-2002 Update handler names
anjalitrace2-nhs May 5, 2026
7cdd3b7
NRL-2002 Add instructions for enabling point-in-time recovery mode re…
anjalitrace2-nhs May 5, 2026
68b4d56
NRL-2002 Use KMS key arn rather than uuid to prevent export failing o…
anjalitrace2-nhs May 6, 2026
ce68b0d
NRL-2002 Give dyanmo export trigger lambda perms to access the table …
anjalitrace2-nhs May 6, 2026
2f019a3
NRL-2002 Comment out unused required arg for slack notification on error
anjalitrace2-nhs May 6, 2026
3d89d76
NRL-2002 initialise export_type in case no from_times calculated
anjalitrace2-nhs May 6, 2026
3164a0f
NRL-2002 logs for debugging
anjalitrace2-nhs May 6, 2026
b6638c3
NRL-2002 remove prints no longer needed to debug
anjalitrace2-nhs May 6, 2026
40696ac
NRL-2002 rename last updated column to be valid
anjalitrace2-nhs May 6, 2026
83e1cf2
NRL-2002 rename missed last updated column
anjalitrace2-nhs May 7, 2026
1b4da85
NRL-2002 rename missed flags glue table name and update keys to match…
anjalitrace2-nhs May 8, 2026
d78f2eb
NRL-2002 merge schema changes rather than throw error when new column…
anjalitrace2-nhs May 8, 2026
52fe168
NRL-2002 remove schemas column
anjalitrace2-nhs May 8, 2026
bcaa53c
NRL-2002 allow rerunning within an hour. Just always grab freshest da…
anjalitrace2-nhs May 8, 2026
f0ed1cf
NRL-2002 point glue job at single export in output bucket and no long…
anjalitrace2-nhs May 8, 2026
f34d3c7
NRL-2002 share a timestamp prefix for each export and pass to glue jo…
anjalitrace2-nhs May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,16 @@ check-deploy: ## check the deploy environment is setup correctly
check-deploy-warn:
@SHOULD_WARN_ONLY=true ./scripts/check-deploy-environment.sh

build: check-warn build-api-packages build-layers build-dependency-layer build-seed-sandbox-lambda ## Build the project
build: check-warn build-api-packages build-layers build-dependency-layer build-seed-sandbox-lambda build-dynamo-export-lambdas ## Build the project

build-seed-sandbox-lambda:
@echo "Building seed_sandbox Lambda"
@cd lambdas/seed_sandbox && make build

build-dynamo-export-lambdas:
@echo "Building dynamo_export Lambdas"
@cd lambdas/dynamo_export && make build

build-dependency-layer:
@echo "Building Lambda dependency layer"
@mkdir -p $(DIST_PATH)
Expand Down
33 changes: 33 additions & 0 deletions lambdas/dynamo_export/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
.PHONY: *

FILE_TO_PACKAGE?=

clean:
@echo "Cleaning build artifacts..."
rm -rf build
@echo "✓ Clean complete"

build-lambda: clean
$(eval LAMBDA_NAME := $(basename ${FILE_TO_PACKAGE}))
@echo "Building $(LAMBDA_NAME) Lambda deployment package..."
mkdir -p build

# Copy the handler
cp $(FILE_TO_PACKAGE) build/

# Create the zip file in root dist
mkdir -p ../../dist
cd build && zip -r "../../../dist/${LAMBDA_NAME}.zip" . -x "*.pyc" -x "__pycache__/*" -x ".DS_Store"

@echo "✓ Lambda package created: ../../dist/${LAMBDA_NAME}.zip"

build-dynamo-export-trigger:
FILE_TO_PACKAGE=dynamo_export_trigger.py make build-lambda

build-dynamo-export-poll:
FILE_TO_PACKAGE=dynamo_export_poll.py make build-lambda

build-ssm-put-param:
FILE_TO_PACKAGE=ssm_put_param.py make build-lambda

build: build-dynamo-export-trigger build-dynamo-export-poll build-ssm-put-param
32 changes: 32 additions & 0 deletions lambdas/dynamo_export/dynamo_export_poll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import boto3
from botocore.config import Config

ddb = boto3.client(
"dynamodb",
config=Config(connect_timeout=5, read_timeout=5),
)


def lambda_handler(event, _context):
completed = []
response = ddb.describe_export(ExportArn=event["export_arn"])
if response["ExportDescription"]["ExportStatus"] == "FAILED":
return {
"status": "FAILED",
"export_to_time": event["export_to_time"],
"export_arn": event["export_arn"],
"export_type": event["export_type"],
"output_prefix": event["output_prefix"],
}

completed.append(response["ExportDescription"]["ExportStatus"])

status = "COMPLETED" if all(s == "COMPLETED" for s in completed) else "IN_PROGRESS"

return {
"status": status,
"export_to_time": event["export_to_time"],
"export_arn": event["export_arn"],
"export_type": event["export_type"],
"output_prefix": event["output_prefix"],
}
85 changes: 85 additions & 0 deletions lambdas/dynamo_export/dynamo_export_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
from datetime import datetime, timezone

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

bucket = os.environ["BUCKET"]
ddb_table_arn = os.environ["DDB_TABLE_ARN"]
kms_key = os.environ["KMS_KEY"]
env = os.environ["ENVIRONMENT"]
ddb_table_name = os.environ["DDB_TABLE_NAME"]

SSM_PARAM = "/exports/DynamoExportRuntime"

ddb_client = boto3.client(
"dynamodb",
config=Config(connect_timeout=5, read_timeout=5),
)
ssm = boto3.client(
"ssm",
config=Config(connect_timeout=5, read_timeout=5),
)


def lambda_handler(_event, _context):
now_time = datetime.now(timezone.utc)
export_arn = None
export_type = None

output_prefix = now_time.isoformat()

try:
last_run_time_str = ssm.get_parameter(Name=SSM_PARAM)["Parameter"]["Value"]
last_run_date = datetime.fromisoformat(last_run_time_str).replace(
microsecond=0, second=0, minute=0
)

# Handle exports longer than 24 hours by splitting into multiple exports
earliest_pitr = ddb_client.describe_continuous_backups(
TableName=ddb_table_name
)["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"][
"EarliestRestorableDateTime"
]

from_time = max(last_run_date, earliest_pitr)

response = ddb_client.export_table_to_point_in_time(
TableArn=ddb_table_arn,
S3Bucket=bucket,
S3Prefix=output_prefix,
S3SseAlgorithm="KMS",
S3SseKmsKeyId=kms_key,
ExportFormat="DYNAMODB_JSON",
ExportType="INCREMENTAL_EXPORT",
IncrementalExportSpecification={
"ExportFromTime": from_time,
"ExportToTime": now_time,
"ExportViewType": "NEW_AND_OLD_IMAGES",
},
)
export_arn = response["ExportDescription"]["ExportArn"]
export_type = response["ExportDescription"]["ExportType"]

except ClientError as e:
if e.response["Error"]["Code"] != "ParameterNotFound":
raise
response = ddb_client.export_table_to_point_in_time(
TableArn=ddb_table_arn,
S3Bucket=bucket,
S3Prefix=output_prefix,
S3SseAlgorithm="KMS",
S3SseKmsKeyId=kms_key,
ExportFormat="DYNAMODB_JSON",
ExportType="FULL_EXPORT",
)
export_arn = response["ExportDescription"]["ExportArn"]
export_type = response["ExportDescription"]["ExportType"]

return {
"export_to_time": now_time.isoformat(),
"export_arn": export_arn,
"export_type": export_type,
"output_prefix": output_prefix,
}
19 changes: 19 additions & 0 deletions lambdas/dynamo_export/ssm_put_param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import boto3
from botocore.config import Config

ssm = boto3.client(
"ssm",
config=Config(connect_timeout=5, read_timeout=5),
)


def lambda_handler(event, _context):
param_name = "/exports/DynamoExportRuntime"
param_value = event["export_to_time"]
ssm.put_parameter(Name=param_name, Value=param_value, Type="String", Overwrite=True)
return {
"to_time": param_value,
"export_arn": event["export_arn"],
"export_type": event["export_type"],
"output_prefix": event["output_prefix"],
}
197 changes: 197 additions & 0 deletions lambdas/dynamo_export/tests/test_dynamo_export_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import os
from datetime import datetime, timedelta, timezone
from unittest.mock import patch

import pytest
from botocore.exceptions import ClientError

os.environ.setdefault("BUCKET", "test-bucket")
os.environ.setdefault(
"DDB_TABLE_ARN",
"arn:aws:dynamodb:eu-west-2:123456789012:table/test-table",
)
os.environ.setdefault("KMS_KEY", "arn:aws:kms:eu-west-2:123456789012:key/test-key")
os.environ.setdefault("DDB_TABLE_NAME", "test-table")

from lambdas.dynamo_export.dynamo_export_trigger import lambda_handler

FIXED_NOW = datetime(2026, 5, 8, 14, 30, 45, 123456, tzinfo=timezone.utc)
FIXED_NOW_HOUR = FIXED_NOW.replace(microsecond=0, second=0, minute=0)


def _client_error(code: str) -> ClientError:
return ClientError({"Error": {"Code": code, "Message": "Test error"}}, "operation")


def _make_pitr_response(earliest: datetime) -> dict:
return {
"ContinuousBackupsDescription": {
"PointInTimeRecoveryDescription": {
"EarliestRestorableDateTime": earliest,
}
}
}


@pytest.fixture
def mock_ddb():
with patch("lambdas.dynamo_export.dynamo_export_trigger.ddb_client") as mock:
yield mock


@pytest.fixture
def mock_ssm():
with patch("lambdas.dynamo_export.dynamo_export_trigger.ssm") as mock:
yield mock


@pytest.fixture
def mock_datetime():
with patch("lambdas.dynamo_export.dynamo_export_trigger.datetime") as mock_dt:
mock_dt.now.return_value = FIXED_NOW
mock_dt.fromisoformat.side_effect = datetime.fromisoformat
yield mock_dt


def test_full_export_when_no_ssm_parameter(mock_ddb, mock_ssm, mock_datetime):
"""When the SSM parameter does not exist, a FULL_EXPORT is triggered"""
mock_ssm.get_parameter.side_effect = _client_error("ParameterNotFound")
export_arn = (
"arn:aws:dynamodb:eu-west-2:123456789012:table/test-table/export/full-01"
)
mock_ddb.export_table_to_point_in_time.return_value = {
"ExportDescription": {
"ExportArn": export_arn,
"ExportType": "FULL_EXPORT",
}
}

result = lambda_handler({}, None)

assert result["export_type"] == "FULL_EXPORT"
assert result["export_arn"] == export_arn
assert result["export_to_time"] == FIXED_NOW.isoformat()
mock_ddb.export_table_to_point_in_time.assert_called_once_with(
TableArn=os.environ["DDB_TABLE_ARN"],
S3Bucket=os.environ["BUCKET"],
S3SseAlgorithm="KMS",
S3SseKmsKeyId=os.environ["KMS_KEY"],
ExportFormat="DYNAMODB_JSON",
ExportType="FULL_EXPORT",
)
mock_ddb.describe_continuous_backups.assert_not_called()


def test_incremental_export_single_window(mock_ddb, mock_ssm, mock_datetime):
"""When last run was earlier this hour, a single incremental export window is created"""
last_run_time = FIXED_NOW_HOUR
mock_ssm.get_parameter.return_value = {
"Parameter": {"Value": last_run_time.isoformat()}
}
mock_ddb.describe_continuous_backups.return_value = _make_pitr_response(
FIXED_NOW_HOUR - timedelta(days=7)
)
export_arn = (
"arn:aws:dynamodb:eu-west-2:123456789012:table/test-table/export/inc-01"
)
mock_ddb.export_table_to_point_in_time.return_value = {
"ExportDescription": {
"ExportArn": export_arn,
"ExportType": "INCREMENTAL_EXPORT",
}
}

result = lambda_handler({}, None)

assert result["export_type"] == "INCREMENTAL_EXPORT"
assert result["export_arn"] == export_arn
assert result["export_to_time"] == FIXED_NOW.isoformat()
mock_ddb.export_table_to_point_in_time.assert_called_once_with(
TableArn=os.environ["DDB_TABLE_ARN"],
S3Bucket=os.environ["BUCKET"],
S3SseAlgorithm="KMS",
S3SseKmsKeyId=os.environ["KMS_KEY"],
ExportFormat="DYNAMODB_JSON",
ExportType="INCREMENTAL_EXPORT",
IncrementalExportSpecification={
"ExportFromTime": FIXED_NOW_HOUR,
"ExportToTime": FIXED_NOW,
"ExportViewType": "NEW_AND_OLD_IMAGES",
},
)


def test_incremental_export_multiple_days(mock_ddb, mock_ssm, mock_datetime):
"""When last run was 3 days ago, 4 incremental windows are created (one per elapsed day + today)"""
last_run_time = FIXED_NOW_HOUR - timedelta(days=3)
mock_ssm.get_parameter.return_value = {
"Parameter": {"Value": last_run_time.isoformat()}
}
mock_ddb.describe_continuous_backups.return_value = _make_pitr_response(
FIXED_NOW_HOUR - timedelta(days=35)
)
mock_ddb.export_table_to_point_in_time.side_effect = [
{
"ExportDescription": {
"ExportArn": f"arn:...export/{i:02d}",
"ExportType": "INCREMENTAL_EXPORT",
}
}
for i in range(4)
]

result = lambda_handler({}, None)

assert result["export_type"] == "INCREMENTAL_EXPORT"
assert len(result["export_arn"]) > 0
assert mock_ddb.export_table_to_point_in_time.call_count == 4

calls = mock_ddb.export_table_to_point_in_time.call_args_list
first_spec = calls[0][1]["IncrementalExportSpecification"]
assert first_spec["ExportFromTime"] == last_run_time
assert first_spec["ExportToTime"] == last_run_time + timedelta(days=1)

last_spec = calls[-1][1]["IncrementalExportSpecification"]
assert last_spec["ExportFromTime"] == last_run_time + timedelta(days=3)
assert last_spec["ExportToTime"] == FIXED_NOW


def test_incremental_export_uses_earliest_pitr_as_lower_bound(
mock_ddb, mock_ssm, mock_datetime
):
"""When last_run predates earliest_pitr, the export window starts at earliest_pitr."""
last_run_time = FIXED_NOW_HOUR - timedelta(days=40) # well outside PITR window
earliest_pitr = FIXED_NOW_HOUR - timedelta(hours=2)
mock_ssm.get_parameter.return_value = {
"Parameter": {"Value": last_run_time.isoformat()}
}
mock_ddb.describe_continuous_backups.return_value = _make_pitr_response(
earliest_pitr
)
mock_ddb.export_table_to_point_in_time.return_value = {
"ExportDescription": {
"ExportArn": "arn:...export/pitr-01",
"ExportType": "INCREMENTAL_EXPORT",
}
}

result = lambda_handler({}, None)

assert result["export_type"] == "INCREMENTAL_EXPORT"
assert len(result["export_arn"]) > 0
call_kwargs = mock_ddb.export_table_to_point_in_time.call_args[1]
assert (
call_kwargs["IncrementalExportSpecification"]["ExportFromTime"] == earliest_pitr
)


def test_non_parameter_not_found_client_error_is_reraised(
mock_ddb, mock_ssm, mock_datetime
):
"""ClientErrors other than ParameterNotFound propagate to the caller"""
mock_ssm.get_parameter.side_effect = _client_error("AccessDeniedException")

with pytest.raises(ClientError) as exc_info:
lambda_handler({}, None)

assert exc_info.value.response["Error"]["Code"] == "AccessDeniedException"
Loading
Loading