Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions superset/commands/report/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
ReportScheduleStateNotFoundError,
ReportScheduleSystemErrorsException,
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
from superset.common.chart_data import ChartDataResultFormat, ChartDataResultType
from superset.daos.report import (
Expand Down Expand Up @@ -929,10 +928,25 @@ def next(self) -> None: # noqa: C901

class ReportWorkingState(BaseReportState):
"""
Handle Working state
Handle Working state.

When a report is found in WORKING state, one of two things happened:

1. The report is genuinely still running (recent WORKING log entry).
→ Raise ``ReportSchedulePreviousWorkingError`` so the scheduler
skips this tick.

2. The worker crashed (OOM, pod eviction, etc.) and the report is stuck.
The ``working_timeout`` has elapsed.
→ Reset to **NOOP** and immediately re-execute via
``ReportNotTriggeredErrorState``. This is safe because by the
time ``working_timeout`` (typically ≥ 1 hour) has elapsed, any
celery broker requeue (~30 min) has already been attempted and
rejected with ``ReportSchedulePreviousWorkingError``.

next states:
- Error
- Working
- NOOP → (immediate retry via ReportNotTriggeredErrorState)
- WORKING (genuinely still running)
"""

current_states = [ReportState.WORKING]
Expand All @@ -947,17 +961,26 @@ def next(self) -> None:
if last_working
else None
)
logger.error(
"Working state timeout after %.2fs - execution_id: %s",
logger.warning(
"Working state timeout after %.2fs, resetting to NOOP "
"and retrying immediately - execution_id: %s",
elapsed_seconds if elapsed_seconds else 0,
self._execution_id,
)
exception_timeout = ReportScheduleWorkingTimeoutError()
self.update_report_schedule_and_log(
ReportState.ERROR,
error_message=str(exception_timeout),
ReportState.NOOP,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By setting as NOOP and recovering the stuck state, would we be introducing an infinite loop? Asking because I see no "stop" mechanism for the attempts after automatically re-scheduling X amount of time?

error_message=(
"Working timeout reached: previous execution appears "
"stuck (possibly due to a worker crash). "
"Resetting and retrying."
),
)
raise exception_timeout
ReportNotTriggeredErrorState(
self._report_schedule,
self._scheduled_dttm,
self._execution_id,
).next()
return
logger.warning(
"Report still in working state, refusing to re-compute - execution_id: %s",
self._execution_id,
Expand Down
19 changes: 10 additions & 9 deletions tests/integration_tests/reports/commands_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from sqlalchemy.sql import func

from superset import db
from superset.commands.exceptions import CommandException
from superset.commands.report.exceptions import (
AlertQueryError,
AlertQueryInvalidTypeError,
Expand All @@ -51,7 +52,6 @@
ReportScheduleScreenshotFailedError,
ReportScheduleScreenshotTimeout,
ReportScheduleSystemErrorsException,
ReportScheduleWorkingTimeoutError,
)
from superset.commands.report.execute import (
AsyncExecuteReportScheduleCommand,
Expand Down Expand Up @@ -1742,26 +1742,27 @@ def test_report_schedule_working(create_report_slack_chart_working):
@pytest.mark.usefixtures("create_report_slack_chart_working")
def test_report_schedule_working_timeout(create_report_slack_chart_working):
"""
ExecuteReport Command: Test report schedule still working but should timed out
ExecuteReport Command: Test report schedule stuck in WORKING past timeout
resets to NOOP and immediately retries via ReportNotTriggeredErrorState.
The retry itself may fail (e.g. no webdriver in CI) — that's expected;
what matters is the stuck state was recovered.
"""
current_time = create_report_slack_chart_working.last_eval_dttm + timedelta(
seconds=create_report_slack_chart_working.working_timeout + 1
)
with freeze_time(current_time):
with pytest.raises(ReportScheduleWorkingTimeoutError):
# The NOOP reset succeeds, but the immediate retry will fail
# in test environments (no webdriver), raising a CommandException.
with pytest.raises(CommandException):
AsyncExecuteReportScheduleCommand(
TEST_ID,
create_report_slack_chart_working.id,
datetime.utcnow(),
).run()

logs = db.session.query(ReportExecutionLog).all()
# Two logs, first is created by fixture
assert len(logs) == 2
assert ReportScheduleWorkingTimeoutError.message in [
log.error_message for log in logs
]
assert create_report_slack_chart_working.last_state == ReportState.ERROR
# Verify the NOOP reset happened (stuck working state was detected)
assert any("stuck" in (log.error_message or "").lower() for log in logs)


@pytest.mark.usefixtures("create_alert_slack_chart_success")
Expand Down
29 changes: 20 additions & 9 deletions tests/unit_tests/commands/report/execute_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
ReportScheduleScreenshotTimeout,
ReportScheduleStateNotFoundError,
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
from superset.commands.report.execute import (
BaseReportState,
Expand Down Expand Up @@ -1110,8 +1109,10 @@ def _make_state_instance(
return instance


def test_working_state_timeout_raises_timeout_error(mocker: MockerFixture) -> None:
"""Working state past timeout should raise WorkingTimeoutError and log ERROR."""
def test_working_state_timeout_resets_to_noop_and_retries(
mocker: MockerFixture,
) -> None:
"""Working state past timeout should reset to NOOP and immediately retry."""
state = _make_state_instance(mocker, ReportWorkingState)
mocker.patch.object(state, "is_on_working_timeout", return_value=True)

Expand All @@ -1123,14 +1124,24 @@ def test_working_state_timeout_raises_timeout_error(mocker: MockerFixture) -> No
)
mocker.patch.object(state, "update_report_schedule_and_log")

with pytest.raises(ReportScheduleWorkingTimeoutError):
state.next()

state.update_report_schedule_and_log.assert_called_once_with( # type: ignore[attr-defined]
ReportState.ERROR,
error_message=str(ReportScheduleWorkingTimeoutError()),
# Mock the retry path so it doesn't actually execute the report
mock_retry_state = mocker.Mock()
mocker.patch(
"superset.commands.report.execute.ReportNotTriggeredErrorState",
return_value=mock_retry_state,
)

# Should NOT raise — resets state and retries immediately
state.next()

state.update_report_schedule_and_log.assert_called_once() # type: ignore[attr-defined]
call_args = state.update_report_schedule_and_log.call_args # type: ignore[attr-defined]
assert call_args[0][0] == ReportState.NOOP
assert "stuck" in call_args[1]["error_message"].lower()

# Verify immediate retry was triggered
mock_retry_state.next.assert_called_once()


def test_working_state_still_working_raises_previous_working(
mocker: MockerFixture,
Expand Down
Loading