From 601a9e4829ce62f7b698f76d57b9ef8ff114ed6e Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 4 Feb 2026 12:06:22 -0800 Subject: [PATCH 1/2] feat(pipeline_step): Add emergency patch support for deployment overrides Add optional emergency_patch field to PipelineStepContext that allows clients to override any deployment configuration as a final merge layer. This enables emergency production fixes without code changes. Changes: - Add NotRequired emergency_patch field to PipelineStepContext TypedDict - Update parse_context() to handle emergency_patch parsing - Apply emergency_patch as final layer in run() method after all other merges - Add test verifying emergency patch overrides work correctly Resolves STREAM-708 Co-Authored-By: Claude Sonnet 4.5 --- .../sentry_streams_k8s/pipeline_step.py | 18 ++++- .../tests/test_pipeline_step.py | 65 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index c24e6837..10a05910 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -3,7 +3,7 @@ import json import re from importlib.resources import files -from typing import Any, TypedDict, cast +from typing import Any, NotRequired, TypedDict, cast import yaml from libsentrykube.ext import ExternalMacro @@ -123,6 +123,14 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext: else: pipeline_config_parsed = context["pipeline_config"] + # Parse emergency_patch if present (defaults to empty dict) + emergency_patch_parsed: dict[str, Any] = {} + if "emergency_patch" in context: + if isinstance(context["emergency_patch"], str): + emergency_patch_parsed = yaml.safe_load(context["emergency_patch"]) or {} + else: + emergency_patch_parsed = context["emergency_patch"] or {} + return { "service_name": context["service_name"], "pipeline_name": context["pipeline_name"], @@ -134,6 +142,7 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext: "cpu_per_process": context["cpu_per_process"], "memory_per_process": context["memory_per_process"], "segment_id": context["segment_id"], + "emergency_patch": emergency_patch_parsed, } @@ -150,6 +159,7 @@ class PipelineStepContext(TypedDict): cpu_per_process: int memory_per_process: int segment_id: int + emergency_patch: NotRequired[dict[str, Any]] class PipelineStep(ExternalMacro): @@ -244,6 +254,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: pipeline_name = ctx["pipeline_name"] segment_id = ctx["segment_id"] service_name = ctx["service_name"] + emergency_patch = ctx.get("emergency_patch", {}) # Create deployment @@ -296,6 +307,11 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: deployment = deepmerge(deployment, pipeline_additions) + # Apply emergency patch as the final layer + # This allows overriding any deployment configuration in emergencies + if emergency_patch: + deployment = deepmerge(deployment, emergency_patch) + # Create configmap configmap = { "apiVersion": "v1", diff --git a/sentry_streams_k8s/tests/test_pipeline_step.py b/sentry_streams_k8s/tests/test_pipeline_step.py index ccfffe4c..6cc33399 100644 --- a/sentry_streams_k8s/tests/test_pipeline_step.py +++ b/sentry_streams_k8s/tests/test_pipeline_step.py @@ -573,3 +573,68 @@ def test_user_volumes_and_containers_preserved() -> None: volume_mount_names = [vm["name"] for vm in pipeline_container["volumeMounts"]] assert "user-volume" in volume_mount_names assert "pipeline-config" in volume_mount_names + + +def test_emergency_patch_overrides_final_deployment() -> None: + """Test that emergency_patch overrides all other layers including pipeline additions.""" + context: dict[str, Any] = { + "service_name": "my-service", + "pipeline_name": "profiles", + "deployment_template": { + "spec": { + "replicas": 1, # Base template default + } + }, + "container_template": {}, + "pipeline_config": { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "myinput": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + } + } + } + ] + }, + }, + "pipeline_module": "sbc.profiles", + "image_name": "my-image:latest", + "cpu_per_process": 1000, + "memory_per_process": 512, + "segment_id": 0, + # Emergency patch to override replicas and add security context + "emergency_patch": { + "spec": { + "replicas": 3, # Override base template + "template": { + "spec": { + "securityContext": { + "runAsNonRoot": True, + "fsGroup": 2000, + } + } + }, + } + }, + } + + pipeline_step = PipelineStep() + result = pipeline_step.run(context) + deployment = result["deployment"] + + # Verify emergency patch overrides took effect + assert deployment["spec"]["replicas"] == 3 # Emergency patch value, not base 1 + + # Verify deeply nested emergency patch values are present + assert deployment["spec"]["template"]["spec"]["securityContext"]["runAsNonRoot"] is True + assert deployment["spec"]["template"]["spec"]["securityContext"]["fsGroup"] == 2000 + + # Verify pipeline additions are still present (not removed by emergency patch) + assert deployment["metadata"]["name"] == "my-service-pipeline-profiles-0" + assert deployment["metadata"]["labels"]["pipeline-app"] == "sbc-profiles" + assert len(deployment["spec"]["template"]["spec"]["containers"]) == 1 + assert deployment["spec"]["template"]["spec"]["containers"][0]["name"] == "pipeline-consumer" From 0caed4ffe7d4b5b4de5685f83e5eec1f07ec96f8 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 4 Feb 2026 12:12:01 -0800 Subject: [PATCH 2/2] chore: Remove unnecessary comments per code review Remove explanatory comments from emergency_patch implementation as requested in code review feedback. Co-Authored-By: Claude Sonnet 4.5 --- sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index 10a05910..e795425a 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -123,7 +123,6 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext: else: pipeline_config_parsed = context["pipeline_config"] - # Parse emergency_patch if present (defaults to empty dict) emergency_patch_parsed: dict[str, Any] = {} if "emergency_patch" in context: if isinstance(context["emergency_patch"], str): @@ -307,8 +306,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: deployment = deepmerge(deployment, pipeline_additions) - # Apply emergency patch as the final layer - # This allows overriding any deployment configuration in emergencies if emergency_patch: deployment = deepmerge(deployment, emergency_patch)