diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt index 921fc0bf5d34f6..71a2b5e7045f6b 100644 --- a/migrations_lockfile.txt +++ b/migrations_lockfile.txt @@ -39,4 +39,4 @@ tempest: 0003_use_encrypted_char_field uptime: 0055_backfill_2xx_status_assertion -workflow_engine: 0111_add_workflowfirehistory_date_added_index +workflow_engine: 0112_drop_redundant_error_detector_workflows diff --git a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py new file mode 100644 index 00000000000000..ecac262b508af1 --- /dev/null +++ b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py @@ -0,0 +1,87 @@ +# Generated by Django 5.2.12 on 2026-04-07 17:56 +import logging + +from django.db import migrations +from django.db.backends.base.schema import BaseDatabaseSchemaEditor +from django.db.models import Exists, OuterRef +from django.db.migrations.state import StateApps + +from sentry.new_migrations.migrations import CheckedMigration +from sentry.utils.iterators import chunked +from sentry.utils.query import bulk_delete_objects + +logger = logging.getLogger(__name__) + +CHUNK_SIZE = 10000 + + +def delete_redundant_error_detector_workflows( + apps: StateApps, schema_editor: BaseDatabaseSchemaEditor +) -> None: + """ + Delete DetectorWorkflow rows where: + 1. The detector type is "error" + 2. There exists another DetectorWorkflow with the same workflow_id + connected to an "issue_stream" detector + + Processes error detector IDs in chunks using RangeQuerySetWrapper to avoid + loading millions of IDs into a single query. + """ + Detector = apps.get_model("workflow_engine", "Detector") + DetectorWorkflow = apps.get_model("workflow_engine", "DetectorWorkflow") + + error_detector_ids = Detector.objects.filter(type="error").values_list("id", flat=True) + + for chunk in chunked(error_detector_ids, CHUNK_SIZE): + issue_stream_exists = DetectorWorkflow.objects.filter( + workflow_id=OuterRef("workflow_id"), + detector__type="issue_stream", + ) + + dws_to_delete = ( + DetectorWorkflow.objects.filter( + detector_id__in=chunk, + ) + .filter(Exists(issue_stream_exists)) + .values_list("id", flat=True) + ) + + while bulk_delete_objects( + DetectorWorkflow, + logger=logger, + id__in=dws_to_delete, + ): + pass + + +class Migration(CheckedMigration): + # This flag is used to mark that a migration shouldn't be automatically run in production. + # This should only be used for operations where it's safe to run the migration after your + # code has deployed. So this should not be used for most operations that alter the schema + # of a table. + # Here are some things that make sense to mark as post deployment: + # - Large data migrations. Typically we want these to be run manually so that they can be + # monitored and not block the deploy for a long period of time while they run. + # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to + # run this outside deployments so that we don't block them. Note that while adding an index + # is a schema change, it's completely safe to run the operation after the code has deployed. + # Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment + + is_post_deployment = True + + dependencies = [ + ("workflow_engine", "0111_add_workflowfirehistory_date_added_index"), + ] + + operations = [ + migrations.RunPython( + delete_redundant_error_detector_workflows, + migrations.RunPython.noop, + hints={ + "tables": [ + "workflow_engine_detector", + "workflow_engine_detectorworkflow", + ] + }, + ), + ] diff --git a/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py b/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py new file mode 100644 index 00000000000000..fa7cee731ebd35 --- /dev/null +++ b/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py @@ -0,0 +1,71 @@ +from sentry.testutils.cases import TestMigrations +from sentry.workflow_engine.models import DetectorWorkflow + + +class DropRedundantErrorDetectorWorkflowsTest(TestMigrations): + migrate_from = "0111_add_workflowfirehistory_date_added_index" + migrate_to = "0112_drop_redundant_error_detector_workflows" + app = "workflow_engine" + + def setup_initial_state(self) -> None: + self.org = self.create_organization(name="test-org") + self.project1 = self.create_project(organization=self.org) + self.project2 = self.create_project(organization=self.org) + + # === Scenario 1: Should DELETE === + # Error detector workflow with matching issue_stream workflow on same workflow + self.error_detector_1 = self.create_detector(project=self.project1, type="error") + self.issue_stream_detector_1 = self.create_detector( + project=self.project1, type="issue_stream" + ) + self.workflow_1 = self.create_workflow(organization=self.org) + self.dw_error_should_delete = self.create_detector_workflow( + detector=self.error_detector_1, workflow=self.workflow_1 + ) + self.dw_issue_stream_keep = self.create_detector_workflow( + detector=self.issue_stream_detector_1, workflow=self.workflow_1 + ) + + # === Scenario 2: Should NOT DELETE === + # Error detector with no matching issue_stream on same workflow + self.error_detector_2 = self.create_detector(project=self.project1, type="error") + self.workflow_2 = self.create_workflow(organization=self.org) + self.dw_error_no_match = self.create_detector_workflow( + detector=self.error_detector_2, workflow=self.workflow_2 + ) + + # === Scenario 3: Issue_stream detector only === + self.issue_stream_detector_2 = self.create_detector( + project=self.project1, type="issue_stream" + ) + self.workflow_3 = self.create_workflow(organization=self.org) + self.dw_issue_stream_only = self.create_detector_workflow( + detector=self.issue_stream_detector_2, workflow=self.workflow_3 + ) + + # === Scenario 4: Cross-project isolation === + # Project 2 has error detector only (no issue_stream) + self.error_detector_p2 = self.create_detector(project=self.project2, type="error") + self.workflow_p2 = self.create_workflow(organization=self.org) + self.dw_error_project2 = self.create_detector_workflow( + detector=self.error_detector_p2, workflow=self.workflow_p2 + ) + + def test_connections_are_correct_after_migration(self) -> None: + # test deletes error workflow with matching issue stream + assert not DetectorWorkflow.objects.filter(id=self.dw_error_should_delete.id).exists() + + # test preserves issue stream workflow when error deleted + assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_keep.id).exists() + + # test preserves error workflow without matching issue stream + assert DetectorWorkflow.objects.filter(id=self.dw_error_no_match.id).exists() + + # test preserves issue stream only workflow + assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_only.id).exists() + + # test preserves cross project error workflow without issue stream + assert DetectorWorkflow.objects.filter(id=self.dw_error_project2.id).exists() + + # test total count after migration + assert DetectorWorkflow.objects.count() == 4