diff --git a/Makefile b/Makefile index 29d587ec..4954ff83 100644 --- a/Makefile +++ b/Makefile @@ -81,5 +81,8 @@ lint-fix: format: yapf . --in-place --recursive --parallel --exclude=third_party +test-lite: + $(DOCKER_COMPOSE_COMMAND_TASK_RUNNER_LITE) run --build --rm task-runner-lite \ + pytest -q -style: format lint-fix \ No newline at end of file +style: format lint-fix diff --git a/task-runner/requirements.txt b/task-runner/requirements.txt index 4eb5bcce..d0f0eff9 100644 --- a/task-runner/requirements.txt +++ b/task-runner/requirements.txt @@ -9,3 +9,4 @@ pysocks gputil pydantic~=2.11.7 tenacity +pytest diff --git a/task-runner/task_runner/task_request_handler.py b/task-runner/task_runner/task_request_handler.py index a88a268b..d5d7c049 100644 --- a/task-runner/task_runner/task_request_handler.py +++ b/task-runner/task_runner/task_request_handler.py @@ -197,17 +197,26 @@ def save_output( new_task_status=None, force=False, output_filename=None, - ): - _ = self._pack_output(output_filename=output_filename) - self._publish_event(events.TaskOutputUploaded( - id=self.task_id, - machine_id=self.task_runner_uuid, - new_status=new_task_status), - force=force) + ) -> bool: + output_size_bytes = self._pack_output(output_filename=output_filename) + + if output_size_bytes == 0: + return False + + self._publish_event( + events.TaskOutputUploaded( + id=self.task_id, + machine_id=self.task_runner_uuid, + new_status=new_task_status, + ), + force=force, + ) + # Remove the request JSON file to prevent multiple uploads # after a successful task data upload if os.path.exists(self.request_path): os.remove(self.request_path) + return True def is_task_running(self) -> bool: @@ -615,21 +624,32 @@ def _execute_request( return exit_code, exit_reason def _pack_output(self, output_filename: Optional[str] = None) -> int: - """Compress outputs and store them in the shared drive.""" + """Compress and upload outputs. If the output size is 0 or not + determined (which may happen during unusual conditions) the upload is + aborted.""" + output_size_bytes = 0 + if self.task_workdir is None: logging.error("Working directory not found.") - return + return output_size_bytes output_dir = os.path.join(self.task_workdir, utils.OUTPUT_DIR) if not os.path.exists(output_dir): logging.error("Output directory not found: %s", output_dir) - return + return output_size_bytes output_size_bytes = files.get_dir_size(output_dir) logging.info("Output size: %s bytes", output_size_bytes) - if output_size_bytes is not None: - self._post_task_metric(utils.OUTPUT_SIZE, output_size_bytes) + if output_size_bytes is None: + logging.error("Failed to determine size for output directory: %s", + output_dir) + output_size_bytes = 0 + + if output_size_bytes == 0: + return output_size_bytes + + self._post_task_metric(utils.OUTPUT_SIZE, output_size_bytes) output_total_files = files.get_dir_total_files(output_dir) logging.info("Output total files: %s", output_total_files) diff --git a/task-runner/tests/unit/test_task_request_handler.py b/task-runner/tests/unit/test_task_request_handler.py index 57117d43..85f4afd2 100644 --- a/task-runner/tests/unit/test_task_request_handler.py +++ b/task-runner/tests/unit/test_task_request_handler.py @@ -133,13 +133,10 @@ def fixture_task_request_handler( file_manager=mock.MagicMock(), ) - with mock.patch( - "task_runner.api_methods_config.get_executer") as get_executer_mock: - with mock.patch( - "task_runner.utils.files.get_dir_size") as get_dir_size_mock: - get_dir_size_mock.return_value = 0 - get_executer_mock.return_value = MockExecuter - yield handler + with mock.patch("task_runner.api_methods_config.get_executer", + return_value=MockExecuter), mock.patch.object( + handler, "_pack_output", return_value=1000): + yield handler def _setup_mock_task(