Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import re
from importlib.resources import files
from typing import Any, TypedDict, cast
from typing import Any, NotRequired, TypedDict, cast

import yaml
from libsentrykube.ext import ExternalMacro
Expand Down Expand Up @@ -123,6 +123,13 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext:
else:
pipeline_config_parsed = context["pipeline_config"]

emergency_patch_parsed: dict[str, Any] = {}
if "emergency_patch" in context:
if isinstance(context["emergency_patch"], str):
emergency_patch_parsed = yaml.safe_load(context["emergency_patch"]) or {}
else:
emergency_patch_parsed = context["emergency_patch"] or {}

return {
"service_name": context["service_name"],
"pipeline_name": context["pipeline_name"],
Expand All @@ -134,6 +141,7 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext:
"cpu_per_process": context["cpu_per_process"],
"memory_per_process": context["memory_per_process"],
"segment_id": context["segment_id"],
"emergency_patch": emergency_patch_parsed,
}


Expand All @@ -150,6 +158,7 @@ class PipelineStepContext(TypedDict):
cpu_per_process: int
memory_per_process: int
segment_id: int
emergency_patch: NotRequired[dict[str, Any]]


class PipelineStep(ExternalMacro):
Expand Down Expand Up @@ -244,6 +253,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]:
pipeline_name = ctx["pipeline_name"]
segment_id = ctx["segment_id"]
service_name = ctx["service_name"]
emergency_patch = ctx.get("emergency_patch", {})

# Create deployment

Expand Down Expand Up @@ -296,6 +306,9 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]:

deployment = deepmerge(deployment, pipeline_additions)

if emergency_patch:
deployment = deepmerge(deployment, emergency_patch)

# Create configmap
configmap = {
"apiVersion": "v1",
Expand Down
65 changes: 65 additions & 0 deletions sentry_streams_k8s/tests/test_pipeline_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,3 +573,68 @@ def test_user_volumes_and_containers_preserved() -> None:
volume_mount_names = [vm["name"] for vm in pipeline_container["volumeMounts"]]
assert "user-volume" in volume_mount_names
assert "pipeline-config" in volume_mount_names


def test_emergency_patch_overrides_final_deployment() -> None:
"""Test that emergency_patch overrides all other layers including pipeline additions."""
context: dict[str, Any] = {
"service_name": "my-service",
"pipeline_name": "profiles",
"deployment_template": {
"spec": {
"replicas": 1, # Base template default
}
},
"container_template": {},
"pipeline_config": {
"env": {},
"pipeline": {
"segments": [
{
"steps_config": {
"myinput": {
"starts_segment": True,
"bootstrap_servers": ["127.0.0.1:9092"],
}
}
}
]
},
},
"pipeline_module": "sbc.profiles",
"image_name": "my-image:latest",
"cpu_per_process": 1000,
"memory_per_process": 512,
"segment_id": 0,
# Emergency patch to override replicas and add security context
"emergency_patch": {
"spec": {
"replicas": 3, # Override base template
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unremoved comments per reviewer request in PR discussion

Low Severity

The PR discussion shows reviewer @fpacifici requested "Please remove this comment" twice, but comments are still present in the test file. The inline comments like # Base template default, # Emergency patch to override replicas and add security context, and # Override base template in the test data appear to be what the reviewer wants removed.

Additional Locations (1)

Fix in Cursor Fix in Web

"template": {
"spec": {
"securityContext": {
"runAsNonRoot": True,
"fsGroup": 2000,
}
}
},
}
},
}

pipeline_step = PipelineStep()
result = pipeline_step.run(context)
deployment = result["deployment"]

# Verify emergency patch overrides took effect
assert deployment["spec"]["replicas"] == 3 # Emergency patch value, not base 1

# Verify deeply nested emergency patch values are present
assert deployment["spec"]["template"]["spec"]["securityContext"]["runAsNonRoot"] is True
assert deployment["spec"]["template"]["spec"]["securityContext"]["fsGroup"] == 2000

# Verify pipeline additions are still present (not removed by emergency patch)
assert deployment["metadata"]["name"] == "my-service-pipeline-profiles-0"
assert deployment["metadata"]["labels"]["pipeline-app"] == "sbc-profiles"
assert len(deployment["spec"]["template"]["spec"]["containers"]) == 1
assert deployment["spec"]["template"]["spec"]["containers"][0]["name"] == "pipeline-consumer"