Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion migrations_lockfile.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ tempest: 0003_use_encrypted_char_field

uptime: 0055_backfill_2xx_status_assertion

workflow_engine: 0111_add_workflowfirehistory_date_added_index
workflow_engine: 0112_drop_redundant_error_detector_workflows
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Generated by Django 5.2.12 on 2026-04-07 17:56
import logging

from django.db import migrations
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.db.models import Exists, OuterRef
from django.db.migrations.state import StateApps

from sentry.new_migrations.migrations import CheckedMigration
from sentry.utils.iterators import chunked
from sentry.utils.query import bulk_delete_objects

logger = logging.getLogger(__name__)

CHUNK_SIZE = 10000


def delete_redundant_error_detector_workflows(
apps: StateApps, schema_editor: BaseDatabaseSchemaEditor
) -> None:
"""
Delete DetectorWorkflow rows where:
1. The detector type is "error"
2. There exists another DetectorWorkflow with the same workflow_id
connected to an "issue_stream" detector

Processes error detector IDs in chunks using RangeQuerySetWrapper to avoid
loading millions of IDs into a single query.
"""
Detector = apps.get_model("workflow_engine", "Detector")
DetectorWorkflow = apps.get_model("workflow_engine", "DetectorWorkflow")

error_detector_ids = Detector.objects.filter(type="error").values_list("id", flat=True)

for chunk in chunked(error_detector_ids, CHUNK_SIZE):
issue_stream_exists = DetectorWorkflow.objects.filter(
workflow_id=OuterRef("workflow_id"),
detector__type="issue_stream",
)

dws_to_delete = (
DetectorWorkflow.objects.filter(
detector_id__in=chunk,
)
.filter(Exists(issue_stream_exists))
.values_list("id", flat=True)
)

while bulk_delete_objects(
DetectorWorkflow,
logger=logger,
id__in=dws_to_delete,
):
pass


class Migration(CheckedMigration):
# This flag is used to mark that a migration shouldn't be automatically run in production.
# This should only be used for operations where it's safe to run the migration after your
# code has deployed. So this should not be used for most operations that alter the schema
# of a table.
# Here are some things that make sense to mark as post deployment:
# - Large data migrations. Typically we want these to be run manually so that they can be
# monitored and not block the deploy for a long period of time while they run.
# - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
# run this outside deployments so that we don't block them. Note that while adding an index
# is a schema change, it's completely safe to run the operation after the code has deployed.
# Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment

is_post_deployment = True

dependencies = [
("workflow_engine", "0111_add_workflowfirehistory_date_added_index"),
]

operations = [
migrations.RunPython(
delete_redundant_error_detector_workflows,
migrations.RunPython.noop,
hints={
"tables": [
"workflow_engine_detector",
"workflow_engine_detectorworkflow",
]
},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from sentry.testutils.cases import TestMigrations
from sentry.workflow_engine.models import DetectorWorkflow


class DropRedundantErrorDetectorWorkflowsTest(TestMigrations):
migrate_from = "0111_add_workflowfirehistory_date_added_index"
migrate_to = "0112_drop_redundant_error_detector_workflows"
app = "workflow_engine"

def setup_initial_state(self) -> None:
self.org = self.create_organization(name="test-org")
self.project1 = self.create_project(organization=self.org)
self.project2 = self.create_project(organization=self.org)

# === Scenario 1: Should DELETE ===
# Error detector workflow with matching issue_stream workflow on same workflow
self.error_detector_1 = self.create_detector(project=self.project1, type="error")
self.issue_stream_detector_1 = self.create_detector(
project=self.project1, type="issue_stream"
)
self.workflow_1 = self.create_workflow(organization=self.org)
self.dw_error_should_delete = self.create_detector_workflow(
detector=self.error_detector_1, workflow=self.workflow_1
)
self.dw_issue_stream_keep = self.create_detector_workflow(
detector=self.issue_stream_detector_1, workflow=self.workflow_1
)

# === Scenario 2: Should NOT DELETE ===
# Error detector with no matching issue_stream on same workflow
self.error_detector_2 = self.create_detector(project=self.project1, type="error")
self.workflow_2 = self.create_workflow(organization=self.org)
self.dw_error_no_match = self.create_detector_workflow(
detector=self.error_detector_2, workflow=self.workflow_2
)

# === Scenario 3: Issue_stream detector only ===
self.issue_stream_detector_2 = self.create_detector(
project=self.project1, type="issue_stream"
)
self.workflow_3 = self.create_workflow(organization=self.org)
self.dw_issue_stream_only = self.create_detector_workflow(
detector=self.issue_stream_detector_2, workflow=self.workflow_3
)

# === Scenario 4: Cross-project isolation ===
# Project 2 has error detector only (no issue_stream)
self.error_detector_p2 = self.create_detector(project=self.project2, type="error")
self.workflow_p2 = self.create_workflow(organization=self.org)
self.dw_error_project2 = self.create_detector_workflow(
detector=self.error_detector_p2, workflow=self.workflow_p2
)

def test_connections_are_correct_after_migration(self) -> None:
# test deletes error workflow with matching issue stream
assert not DetectorWorkflow.objects.filter(id=self.dw_error_should_delete.id).exists()

# test preserves issue stream workflow when error deleted
assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_keep.id).exists()

# test preserves error workflow without matching issue stream
assert DetectorWorkflow.objects.filter(id=self.dw_error_no_match.id).exists()

# test preserves issue stream only workflow
assert DetectorWorkflow.objects.filter(id=self.dw_issue_stream_only.id).exists()

# test preserves cross project error workflow without issue stream
assert DetectorWorkflow.objects.filter(id=self.dw_error_project2.id).exists()

# test total count after migration
assert DetectorWorkflow.objects.count() == 4
Loading