From 712d6b32750587526527838c2dc3f2c14d18a4eb Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Tue, 7 Apr 2026 11:03:26 -0700 Subject: [PATCH 1/5] Migration to drop redundant detector workflow connections --- migrations_lockfile.txt | 2 +- ...drop_redundant_error_detector_workflows.py | 89 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt index 921fc0bf5d34f6..71a2b5e7045f6b 100644 --- a/migrations_lockfile.txt +++ b/migrations_lockfile.txt @@ -39,4 +39,4 @@ tempest: 0003_use_encrypted_char_field uptime: 0055_backfill_2xx_status_assertion -workflow_engine: 0111_add_workflowfirehistory_date_added_index +workflow_engine: 0112_drop_redundant_error_detector_workflows diff --git a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py new file mode 100644 index 00000000000000..d4707e91affb8e --- /dev/null +++ b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py @@ -0,0 +1,89 @@ +# Generated by Django 5.2.12 on 2026-04-07 17:56 +import logging + +from django.db import migrations +from django.db.backends.base.schema import BaseDatabaseSchemaEditor +from django.db.migrations.state import StateApps + +from sentry.new_migrations.migrations import CheckedMigration +from sentry.utils.query import bulk_delete_objects + +logger = logging.getLogger(__name__) + + +def delete_redundant_error_detector_workflows( + apps: StateApps, schema_editor: BaseDatabaseSchemaEditor +) -> None: + """ + Delete DetectorWorkflow rows where: + 1. The detector type is "error" + 2. There exists another DetectorWorkflow with the same workflow_id + connected to an "issue_stream" detector + + Uses batched deletion (10k rows at a time) for efficiency with ~2.4M rows. + """ + Detector = apps.get_model("workflow_engine", "Detector") + DetectorWorkflow = apps.get_model("workflow_engine", "DetectorWorkflow") + + # Get error detector IDs (one per project) + error_detector_ids = set(Detector.objects.filter(type="error").values_list("id", flat=True)) + + # Get workflow IDs that have issue_stream detector connections + issue_stream_detector_ids = Detector.objects.filter(type="issue_stream").values_list( + "id", flat=True + ) + issue_stream_workflow_ids = set( + DetectorWorkflow.objects.filter(detector_id__in=issue_stream_detector_ids).values_list( + "workflow_id", flat=True + ) + ) + + logger.info( + "Starting deletion of redundant error detector workflows", + extra={ + "error_detector_count": len(error_detector_ids), + "issue_stream_workflow_count": len(issue_stream_workflow_ids), + }, + ) + + # Delete in batches using bulk_delete_objects + while bulk_delete_objects( + DetectorWorkflow, + logger=logger, + detector_id__in=error_detector_ids, + workflow_id__in=issue_stream_workflow_ids, + ): + pass + + +class Migration(CheckedMigration): + # This flag is used to mark that a migration shouldn't be automatically run in production. + # This should only be used for operations where it's safe to run the migration after your + # code has deployed. So this should not be used for most operations that alter the schema + # of a table. + # Here are some things that make sense to mark as post deployment: + # - Large data migrations. Typically we want these to be run manually so that they can be + # monitored and not block the deploy for a long period of time while they run. + # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to + # run this outside deployments so that we don't block them. Note that while adding an index + # is a schema change, it's completely safe to run the operation after the code has deployed. + # Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment + + is_post_deployment = True + + dependencies = [ + ("workflow_engine", "0111_add_workflowfirehistory_date_added_index"), + ] + + operations = [ + migrations.RunPython( + delete_redundant_error_detector_workflows, + migrations.RunPython.noop, + hints={ + "tables": [ + "workflow_engine_detector", + "workflow_engine_detectorworkflow", + ] + }, + ), + ] From 2c46e8857bafb0edb9571244bef023e2841e2b3f Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Tue, 7 Apr 2026 13:06:57 -0700 Subject: [PATCH 2/5] Add tests for the migration --- ...drop_redundant_error_detector_workflows.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py diff --git a/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py b/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py new file mode 100644 index 00000000000000..f8d4e3c2be96b4 --- /dev/null +++ b/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py @@ -0,0 +1,70 @@ +from sentry.testutils.cases import TestMigrations +from sentry.workflow_engine.models import DetectorWorkflow + + +class DropRedundantErrorDetectorWorkflowsTest(TestMigrations): + migrate_from = "0111_add_workflowfirehistory_date_added_index" + migrate_to = "0112_drop_redundant_error_detector_workflows" + app = "workflow_engine" + + def setup_initial_state(self) -> None: + self.org = self.create_organization(name="test-org") + self.project1 = self.create_project(organization=self.org) + self.project2 = self.create_project(organization=self.org) + + # === Scenario 1: Should DELETE === + # Error detector workflow with matching issue_stream workflow on same workflow + self.error_detector_1 = self.create_detector(project=self.project1, type="error") + self.issue_stream_detector_1 = self.create_detector( + project=self.project1, type="issue_stream" + ) + self.workflow_1 = self.create_workflow(organization=self.org) + self.dw_error_should_delete = self.create_detector_workflow( + detector=self.error_detector_1, workflow=self.workflow_1 + ) + self.dw_issue_stream_keep = self.create_detector_workflow( + detector=self.issue_stream_detector_1, workflow=self.workflow_1 + ) + + # === Scenario 2: Should NOT DELETE === + # Error detector with no matching issue_stream on same workflow + self.error_detector_2 = self.create_detector(project=self.project1, type="error") + self.workflow_2 = self.create_workflow(organization=self.org) + self.dw_error_no_match = self.create_detector_workflow( + detector=self.error_detector_2, workflow=self.workflow_2 + ) + + # === Scenario 3: Issue_stream detector only === + self.issue_stream_detector_2 = self.create_detector( + project=self.project1, type="issue_stream" + ) + self.workflow_3 = self.create_workflow(organization=self.org) + self.dw_issue_stream_only = self.create_detector_workflow( + detector=self.issue_stream_detector_2, workflow=self.workflow_3 + ) + + # === Scenario 4: Cross-project isolation === + # Project 2 has error detector only (no issue_stream) + self.error_detector_p2 = self.create_detector(project=self.project2, type="error") + self.workflow_p2 = self.create_workflow(organization=self.org) + self.dw_error_project2 = self.create_detector_workflow( + detector=self.error_detector_p2, workflow=self.workflow_p2 + ) + + def test_deletes_error_workflow_with_matching_issue_stream(self) -> None: + assert not DetectorWorkflow.objects.filter(id=self.dw_error_should_delete.id).exists() + + def test_preserves_issue_stream_workflow_when_error_deleted(self) -> None: + assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_keep.id).exists() + + def test_preserves_error_workflow_without_matching_issue_stream(self) -> None: + assert DetectorWorkflow.objects.filter(id=self.dw_error_no_match.id).exists() + + def test_preserves_issue_stream_only_workflow(self) -> None: + assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_only.id).exists() + + def test_preserves_cross_project_error_workflow_without_issue_stream(self) -> None: + assert DetectorWorkflow.objects.filter(id=self.dw_error_project2.id).exists() + + def test_total_count_after_migration(self) -> None: + assert DetectorWorkflow.objects.count() == 4 From b50d8c2ed77b9c27c83a1947e04f4f6266cfad46 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:21:19 -0700 Subject: [PATCH 3/5] update the selection of the error detectors to be a RangeQuerySetWrapper so this processes in batches --- ...drop_redundant_error_detector_workflows.py | 54 +++++++++---------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py index d4707e91affb8e..a93387920fddb9 100644 --- a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py +++ b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py @@ -3,13 +3,17 @@ from django.db import migrations from django.db.backends.base.schema import BaseDatabaseSchemaEditor +from django.db.models import Exists, OuterRef from django.db.migrations.state import StateApps from sentry.new_migrations.migrations import CheckedMigration -from sentry.utils.query import bulk_delete_objects +from sentry.utils.iterators import chunked +from sentry.utils.query import RangeQuerySetWrapper, bulk_delete_objects logger = logging.getLogger(__name__) +CHUNK_SIZE = 10000 + def delete_redundant_error_detector_workflows( apps: StateApps, schema_editor: BaseDatabaseSchemaEditor @@ -20,40 +24,32 @@ def delete_redundant_error_detector_workflows( 2. There exists another DetectorWorkflow with the same workflow_id connected to an "issue_stream" detector - Uses batched deletion (10k rows at a time) for efficiency with ~2.4M rows. + Processes error detector IDs in chunks using RangeQuerySetWrapper to avoid + loading millions of IDs into a single query. """ Detector = apps.get_model("workflow_engine", "Detector") DetectorWorkflow = apps.get_model("workflow_engine", "DetectorWorkflow") - # Get error detector IDs (one per project) - error_detector_ids = set(Detector.objects.filter(type="error").values_list("id", flat=True)) + error_detectors = Detector.objects.filter(type="error") + + for chunk in chunked(RangeQuerySetWrapper(error_detectors, step=CHUNK_SIZE), CHUNK_SIZE): + chunk_ids = [d.id for d in chunk] - # Get workflow IDs that have issue_stream detector connections - issue_stream_detector_ids = Detector.objects.filter(type="issue_stream").values_list( - "id", flat=True - ) - issue_stream_workflow_ids = set( - DetectorWorkflow.objects.filter(detector_id__in=issue_stream_detector_ids).values_list( - "workflow_id", flat=True + issue_stream_exists = DetectorWorkflow.objects.filter( + workflow_id=OuterRef("workflow_id"), + detector__type="issue_stream", ) - ) - - logger.info( - "Starting deletion of redundant error detector workflows", - extra={ - "error_detector_count": len(error_detector_ids), - "issue_stream_workflow_count": len(issue_stream_workflow_ids), - }, - ) - - # Delete in batches using bulk_delete_objects - while bulk_delete_objects( - DetectorWorkflow, - logger=logger, - detector_id__in=error_detector_ids, - workflow_id__in=issue_stream_workflow_ids, - ): - pass + + to_delete = DetectorWorkflow.objects.filter( + detector_id__in=chunk_ids, + ).filter(Exists(issue_stream_exists)) + + while bulk_delete_objects( + DetectorWorkflow, + logger=logger, + id__in=to_delete.values_list("id", flat=True), + ): + pass class Migration(CheckedMigration): From fa0c8e90789bede74117c76c935d0f644835f7ce Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:33:03 -0700 Subject: [PATCH 4/5] make the query a little more performant by only selecting the error_detector ids --- ...0112_drop_redundant_error_detector_workflows.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py index a93387920fddb9..af178d06b7efda 100644 --- a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py +++ b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py @@ -30,7 +30,7 @@ def delete_redundant_error_detector_workflows( Detector = apps.get_model("workflow_engine", "Detector") DetectorWorkflow = apps.get_model("workflow_engine", "DetectorWorkflow") - error_detectors = Detector.objects.filter(type="error") + error_detectors = Detector.objects.filter(type="error").only("id") for chunk in chunked(RangeQuerySetWrapper(error_detectors, step=CHUNK_SIZE), CHUNK_SIZE): chunk_ids = [d.id for d in chunk] @@ -40,14 +40,18 @@ def delete_redundant_error_detector_workflows( detector__type="issue_stream", ) - to_delete = DetectorWorkflow.objects.filter( - detector_id__in=chunk_ids, - ).filter(Exists(issue_stream_exists)) + dws_to_delete = ( + DetectorWorkflow.objects.filter( + detector_id__in=chunk_ids, + ) + .filter(Exists(issue_stream_exists)) + .values_list("id", flat=True) + ) while bulk_delete_objects( DetectorWorkflow, logger=logger, - id__in=to_delete.values_list("id", flat=True), + id__in=dws_to_delete, ): pass From 7fdc7835fc20af512cd475f7185c6818627622a1 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 8 Apr 2026 14:43:01 -0700 Subject: [PATCH 5/5] Change the id list to be all the records and then use chunked to split them into 10k groups to reduce the querying. update the tests to all be 1 test because of performance reasons in those tests --- .../0112_drop_redundant_error_detector_workflows.py | 10 ++++------ ..._0112_drop_redundant_error_detector_workflows.py | 13 +++++++------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py index af178d06b7efda..ecac262b508af1 100644 --- a/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py +++ b/src/sentry/workflow_engine/migrations/0112_drop_redundant_error_detector_workflows.py @@ -8,7 +8,7 @@ from sentry.new_migrations.migrations import CheckedMigration from sentry.utils.iterators import chunked -from sentry.utils.query import RangeQuerySetWrapper, bulk_delete_objects +from sentry.utils.query import bulk_delete_objects logger = logging.getLogger(__name__) @@ -30,11 +30,9 @@ def delete_redundant_error_detector_workflows( Detector = apps.get_model("workflow_engine", "Detector") DetectorWorkflow = apps.get_model("workflow_engine", "DetectorWorkflow") - error_detectors = Detector.objects.filter(type="error").only("id") - - for chunk in chunked(RangeQuerySetWrapper(error_detectors, step=CHUNK_SIZE), CHUNK_SIZE): - chunk_ids = [d.id for d in chunk] + error_detector_ids = Detector.objects.filter(type="error").values_list("id", flat=True) + for chunk in chunked(error_detector_ids, CHUNK_SIZE): issue_stream_exists = DetectorWorkflow.objects.filter( workflow_id=OuterRef("workflow_id"), detector__type="issue_stream", @@ -42,7 +40,7 @@ def delete_redundant_error_detector_workflows( dws_to_delete = ( DetectorWorkflow.objects.filter( - detector_id__in=chunk_ids, + detector_id__in=chunk, ) .filter(Exists(issue_stream_exists)) .values_list("id", flat=True) diff --git a/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py b/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py index f8d4e3c2be96b4..fa7cee731ebd35 100644 --- a/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py +++ b/tests/sentry/workflow_engine/migrations/test_0112_drop_redundant_error_detector_workflows.py @@ -51,20 +51,21 @@ def setup_initial_state(self) -> None: detector=self.error_detector_p2, workflow=self.workflow_p2 ) - def test_deletes_error_workflow_with_matching_issue_stream(self) -> None: + def test_connections_are_correct_after_migration(self) -> None: + # test deletes error workflow with matching issue stream assert not DetectorWorkflow.objects.filter(id=self.dw_error_should_delete.id).exists() - def test_preserves_issue_stream_workflow_when_error_deleted(self) -> None: + # test preserves issue stream workflow when error deleted assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_keep.id).exists() - def test_preserves_error_workflow_without_matching_issue_stream(self) -> None: + # test preserves error workflow without matching issue stream assert DetectorWorkflow.objects.filter(id=self.dw_error_no_match.id).exists() - def test_preserves_issue_stream_only_workflow(self) -> None: + # test preserves issue stream only workflow assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_only.id).exists() - def test_preserves_cross_project_error_workflow_without_issue_stream(self) -> None: + # test preserves cross project error workflow without issue stream assert DetectorWorkflow.objects.filter(id=self.dw_error_project2.id).exists() - def test_total_count_after_migration(self) -> None: + # test total count after migration assert DetectorWorkflow.objects.count() == 4