Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,8 @@ lint-fix:
format:
yapf . --in-place --recursive --parallel --exclude=third_party

test-lite:
$(DOCKER_COMPOSE_COMMAND_TASK_RUNNER_LITE) run --build --rm task-runner-lite \
pytest -q

style: format lint-fix
style: format lint-fix
1 change: 1 addition & 0 deletions task-runner/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pysocks
gputil
pydantic~=2.11.7
tenacity
pytest
44 changes: 32 additions & 12 deletions task-runner/task_runner/task_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,26 @@ def save_output(
new_task_status=None,
force=False,
output_filename=None,
):
_ = self._pack_output(output_filename=output_filename)
self._publish_event(events.TaskOutputUploaded(
id=self.task_id,
machine_id=self.task_runner_uuid,
new_status=new_task_status),
force=force)
) -> bool:
output_size_bytes = self._pack_output(output_filename=output_filename)

if output_size_bytes == 0:
return False

self._publish_event(
events.TaskOutputUploaded(
id=self.task_id,
machine_id=self.task_runner_uuid,
new_status=new_task_status,
),
force=force,
)

# Remove the request JSON file to prevent multiple uploads
# after a successful task data upload
if os.path.exists(self.request_path):
os.remove(self.request_path)

return True

def is_task_running(self) -> bool:
Expand Down Expand Up @@ -615,21 +624,32 @@ def _execute_request(
return exit_code, exit_reason

def _pack_output(self, output_filename: Optional[str] = None) -> int:
"""Compress outputs and store them in the shared drive."""
"""Compress and upload outputs. If the output size is 0 or not
determined (which may happen during unusual conditions) the upload is
aborted."""
output_size_bytes = 0

if self.task_workdir is None:
logging.error("Working directory not found.")
return
return output_size_bytes

output_dir = os.path.join(self.task_workdir, utils.OUTPUT_DIR)
if not os.path.exists(output_dir):
logging.error("Output directory not found: %s", output_dir)
return
return output_size_bytes

output_size_bytes = files.get_dir_size(output_dir)
logging.info("Output size: %s bytes", output_size_bytes)

if output_size_bytes is not None:
self._post_task_metric(utils.OUTPUT_SIZE, output_size_bytes)
if output_size_bytes is None:
logging.error("Failed to determine size for output directory: %s",
output_dir)
output_size_bytes = 0

if output_size_bytes == 0:
return output_size_bytes

self._post_task_metric(utils.OUTPUT_SIZE, output_size_bytes)

output_total_files = files.get_dir_total_files(output_dir)
logging.info("Output total files: %s", output_total_files)
Expand Down
11 changes: 4 additions & 7 deletions task-runner/tests/unit/test_task_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,10 @@ def fixture_task_request_handler(
file_manager=mock.MagicMock(),
)

with mock.patch(
"task_runner.api_methods_config.get_executer") as get_executer_mock:
with mock.patch(
"task_runner.utils.files.get_dir_size") as get_dir_size_mock:
get_dir_size_mock.return_value = 0
get_executer_mock.return_value = MockExecuter
yield handler
with mock.patch("task_runner.api_methods_config.get_executer",
return_value=MockExecuter), mock.patch.object(
handler, "_pack_output", return_value=1000):
yield handler


def _setup_mock_task(
Expand Down