diff --git a/superset/commands/report/execute.py b/superset/commands/report/execute.py index 66265878dad4..1adb31f1faa6 100644 --- a/superset/commands/report/execute.py +++ b/superset/commands/report/execute.py @@ -43,7 +43,6 @@ ReportScheduleStateNotFoundError, ReportScheduleSystemErrorsException, ReportScheduleUnexpectedError, - ReportScheduleWorkingTimeoutError, ) from superset.common.chart_data import ChartDataResultFormat, ChartDataResultType from superset.daos.report import ( @@ -929,10 +928,25 @@ def next(self) -> None: # noqa: C901 class ReportWorkingState(BaseReportState): """ - Handle Working state + Handle Working state. + + When a report is found in WORKING state, one of two things happened: + + 1. The report is genuinely still running (recent WORKING log entry). + → Raise ``ReportSchedulePreviousWorkingError`` so the scheduler + skips this tick. + + 2. The worker crashed (OOM, pod eviction, etc.) and the report is stuck. + The ``working_timeout`` has elapsed. + → Reset to **NOOP** and immediately re-execute via + ``ReportNotTriggeredErrorState``. This is safe because by the + time ``working_timeout`` (typically ≥ 1 hour) has elapsed, any + celery broker requeue (~30 min) has already been attempted and + rejected with ``ReportSchedulePreviousWorkingError``. + next states: - - Error - - Working + - NOOP → (immediate retry via ReportNotTriggeredErrorState) + - WORKING (genuinely still running) """ current_states = [ReportState.WORKING] @@ -947,17 +961,26 @@ def next(self) -> None: if last_working else None ) - logger.error( - "Working state timeout after %.2fs - execution_id: %s", + logger.warning( + "Working state timeout after %.2fs, resetting to NOOP " + "and retrying immediately - execution_id: %s", elapsed_seconds if elapsed_seconds else 0, self._execution_id, ) - exception_timeout = ReportScheduleWorkingTimeoutError() self.update_report_schedule_and_log( - ReportState.ERROR, - error_message=str(exception_timeout), + ReportState.NOOP, + error_message=( + "Working timeout reached: previous execution appears " + "stuck (possibly due to a worker crash). " + "Resetting and retrying." + ), ) - raise exception_timeout + ReportNotTriggeredErrorState( + self._report_schedule, + self._scheduled_dttm, + self._execution_id, + ).next() + return logger.warning( "Report still in working state, refusing to re-compute - execution_id: %s", self._execution_id, diff --git a/tests/integration_tests/reports/commands_tests.py b/tests/integration_tests/reports/commands_tests.py index ae1c855d85ae..34dfb33e4ac5 100644 --- a/tests/integration_tests/reports/commands_tests.py +++ b/tests/integration_tests/reports/commands_tests.py @@ -38,6 +38,7 @@ from sqlalchemy.sql import func from superset import db +from superset.commands.exceptions import CommandException from superset.commands.report.exceptions import ( AlertQueryError, AlertQueryInvalidTypeError, @@ -51,7 +52,6 @@ ReportScheduleScreenshotFailedError, ReportScheduleScreenshotTimeout, ReportScheduleSystemErrorsException, - ReportScheduleWorkingTimeoutError, ) from superset.commands.report.execute import ( AsyncExecuteReportScheduleCommand, @@ -1742,13 +1742,18 @@ def test_report_schedule_working(create_report_slack_chart_working): @pytest.mark.usefixtures("create_report_slack_chart_working") def test_report_schedule_working_timeout(create_report_slack_chart_working): """ - ExecuteReport Command: Test report schedule still working but should timed out + ExecuteReport Command: Test report schedule stuck in WORKING past timeout + resets to NOOP and immediately retries via ReportNotTriggeredErrorState. + The retry itself may fail (e.g. no webdriver in CI) — that's expected; + what matters is the stuck state was recovered. """ current_time = create_report_slack_chart_working.last_eval_dttm + timedelta( seconds=create_report_slack_chart_working.working_timeout + 1 ) with freeze_time(current_time): - with pytest.raises(ReportScheduleWorkingTimeoutError): + # The NOOP reset succeeds, but the immediate retry will fail + # in test environments (no webdriver), raising a CommandException. + with pytest.raises(CommandException): AsyncExecuteReportScheduleCommand( TEST_ID, create_report_slack_chart_working.id, @@ -1756,12 +1761,8 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working): ).run() logs = db.session.query(ReportExecutionLog).all() - # Two logs, first is created by fixture - assert len(logs) == 2 - assert ReportScheduleWorkingTimeoutError.message in [ - log.error_message for log in logs - ] - assert create_report_slack_chart_working.last_state == ReportState.ERROR + # Verify the NOOP reset happened (stuck working state was detected) + assert any("stuck" in (log.error_message or "").lower() for log in logs) @pytest.mark.usefixtures("create_alert_slack_chart_success") diff --git a/tests/unit_tests/commands/report/execute_test.py b/tests/unit_tests/commands/report/execute_test.py index d7ff25a3434c..8ad05fa0997f 100644 --- a/tests/unit_tests/commands/report/execute_test.py +++ b/tests/unit_tests/commands/report/execute_test.py @@ -33,7 +33,6 @@ ReportScheduleScreenshotTimeout, ReportScheduleStateNotFoundError, ReportScheduleUnexpectedError, - ReportScheduleWorkingTimeoutError, ) from superset.commands.report.execute import ( BaseReportState, @@ -1110,8 +1109,10 @@ def _make_state_instance( return instance -def test_working_state_timeout_raises_timeout_error(mocker: MockerFixture) -> None: - """Working state past timeout should raise WorkingTimeoutError and log ERROR.""" +def test_working_state_timeout_resets_to_noop_and_retries( + mocker: MockerFixture, +) -> None: + """Working state past timeout should reset to NOOP and immediately retry.""" state = _make_state_instance(mocker, ReportWorkingState) mocker.patch.object(state, "is_on_working_timeout", return_value=True) @@ -1123,14 +1124,24 @@ def test_working_state_timeout_raises_timeout_error(mocker: MockerFixture) -> No ) mocker.patch.object(state, "update_report_schedule_and_log") - with pytest.raises(ReportScheduleWorkingTimeoutError): - state.next() - - state.update_report_schedule_and_log.assert_called_once_with( # type: ignore[attr-defined] - ReportState.ERROR, - error_message=str(ReportScheduleWorkingTimeoutError()), + # Mock the retry path so it doesn't actually execute the report + mock_retry_state = mocker.Mock() + mocker.patch( + "superset.commands.report.execute.ReportNotTriggeredErrorState", + return_value=mock_retry_state, ) + # Should NOT raise — resets state and retries immediately + state.next() + + state.update_report_schedule_and_log.assert_called_once() # type: ignore[attr-defined] + call_args = state.update_report_schedule_and_log.call_args # type: ignore[attr-defined] + assert call_args[0][0] == ReportState.NOOP + assert "stuck" in call_args[1]["error_message"].lower() + + # Verify immediate retry was triggered + mock_retry_state.next.assert_called_once() + def test_working_state_still_working_raises_previous_working( mocker: MockerFixture,