From 0eaaef9bb11098c5f2e31c505c0b54f73f2a356d Mon Sep 17 00:00:00 2001 From: rvalerio Date: Wed, 10 Sep 2025 09:46:34 +0100 Subject: [PATCH 1/2] Add DownloadFile operation for file streaming --- .../file_tracker/client_connection.py | 13 +- file-tracker/file_tracker/file_operations.py | 124 +++++++++++++++++- 2 files changed, 130 insertions(+), 7 deletions(-) diff --git a/file-tracker/file_tracker/client_connection.py b/file-tracker/file_tracker/client_connection.py index 952cead9..8135c5a5 100644 --- a/file-tracker/file_tracker/client_connection.py +++ b/file-tracker/file_tracker/client_connection.py @@ -9,7 +9,7 @@ RTCPeerConnection, RTCSessionDescription, ) -from file_operations import Operation, OperationError +from file_operations import DownloadFile, Operation, OperationError from operation_response import OperationResponse, OperationStatus @@ -50,7 +50,13 @@ async def on_message(message): operation.path = self.path while not channel_closed.is_set(): try: - response.message = await operation.execute() + # DownloadFile uses direct streaming to support large + # files and avoid memory issues and WebRTC message size + # limits. Other operations return small data. + if isinstance(operation, DownloadFile): + response.message = await operation.execute(channel) + else: + response.message = await operation.execute() except OperationError as e: response = OperationResponse( status=OperationStatus.ERROR, message=str(e)) @@ -58,7 +64,8 @@ async def on_message(message): if response.message is not None: channel.send(response.to_json_string()) - if message.get("follow", False): + follow_mode = message.get("follow", False) + if follow_mode and not isinstance(operation, DownloadFile): await asyncio.sleep(1) else: break diff --git a/file-tracker/file_tracker/file_operations.py b/file-tracker/file_tracker/file_operations.py index d4764b94..315169ec 100644 --- a/file-tracker/file_tracker/file_operations.py +++ b/file-tracker/file_tracker/file_operations.py @@ -1,5 +1,7 @@ import abc import asyncio +import base64 +import json import os import subprocess import time @@ -17,12 +19,17 @@ class Operation: @classmethod def from_request(cls, request): - operation = cls._get_class(request["type"]) - return operation(**request["args"]) + operation_type = request["type"] + operation = cls._get_class(operation_type) + args = request.get("args", {}) + + if "follow" in request and operation == DownloadFile: + args["follow"] = request["follow"] + + return operation(**args) @classmethod def _get_class(cls, type): - if type in cls.SUPPORTED_OPERATIONS: return cls.SUPPORTED_OPERATIONS[type] else: @@ -73,6 +80,7 @@ def top(self) -> str: LAST_MODIFIED_FILE_PATH_SUFFIX = "output/artifacts/output_update.csv" METRICS_FILE_PATH_SUFFIX = "output/artifacts/system_metrics.csv" +DEFAULT_CHUNK_SIZE = 64 * 1024 # 64KB class LastModifiedFile(Operation): @@ -204,10 +212,118 @@ def get_appended(self, path_to_file, filename): return content.split('\n') +class DownloadFile(Operation): + + def __init__(self, file_path, chunk_size=DEFAULT_CHUNK_SIZE, follow=False): + self.file_path = file_path + self.filename = os.path.basename(file_path) + self.chunk_size = chunk_size + self.follow = follow + self.path = None + + async def execute(self, channel): + try: + full_path = os.path.join(self.path, self.file_path) + if not os.path.exists(full_path): + raise OperationError(f"File not found: {self.file_path}") + + if self.follow: + # Real-time streaming mode - stream file as it's being written + await self._stream_realtime_file(channel, full_path) + else: + # Standard download mode - download existing file + await self._stream_static_file(channel, full_path) + + except Exception as e: # noqa: BLE001 + self._send_error(channel, str(e)) + + async def _stream_static_file(self, channel, file_path): + file_size = os.path.getsize(file_path) + self._send_file_metadata(channel, file_size) + + chunk_count = 0 + with open(file_path, 'rb') as f: + while True: + chunk = f.read(self.chunk_size) + if not chunk: + break + + chunk_count += 1 + self._send_chunk(channel, chunk, chunk_count) + + self._send_complete(channel, chunk_count, file_size) + + async def _stream_realtime_file(self, channel, file_path, interval=1): + self._send_file_metadata(channel, None) + + chunk_count = 0 + total_size = 0 + + # Stream file content continuously + with open(file_path, 'rb') as f: + while True: + chunk = f.read(self.chunk_size) + if not chunk: + # No more data, wait a bit + await asyncio.sleep(interval) + continue + + chunk_count += 1 + total_size += len(chunk) + self._send_chunk(channel, chunk, chunk_count) + + def _send_error(self, channel, error_message): + message = { + "status": "error", + "message": { + "type": "download_error", + "error": error_message + } + } + channel.send(json.dumps(message)) + + def _send_file_metadata(self, channel, total_size): + message = { + "status": "success", + "message": { + "type": "download_info", + "filename": self.filename, + "total_size": total_size + } + } + channel.send(json.dumps(message)) + + def _send_chunk(self, channel, chunk, chunk_number): + chunk_b64 = base64.b64encode(chunk).decode('utf-8') + message = { + "status": "success", + "message": { + "type": "download_chunk", + "chunk_number": chunk_number, + "chunk_size": len(chunk), + "data": chunk_b64 + } + } + channel.send(json.dumps(message)) + + def _send_complete(self, channel, chunk_count, total_size): + message = { + "status": "success", + "message": { + "type": "download_complete", + "filename": self.filename, + "total_chunks": chunk_count, + "total_size": total_size + } + } + channel.send(json.dumps(message)) + + # Initialize SUPPORTED_OPERATIONS after defining all classes Operation.SUPPORTED_OPERATIONS = { "ls": List, "tail": Tail, "top": Top, - "last_modified_file": LastModifiedFile + "last_modified_file": LastModifiedFile, + "download_file": DownloadFile } From 8692bfffb49fd0b3d6964d7566568c51d4871413 Mon Sep 17 00:00:00 2001 From: rvalerio Date: Wed, 10 Sep 2025 09:48:19 +0100 Subject: [PATCH 2/2] Update comment --- file-tracker/file_tracker/client_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/file-tracker/file_tracker/client_connection.py b/file-tracker/file_tracker/client_connection.py index 8135c5a5..f8f8580e 100644 --- a/file-tracker/file_tracker/client_connection.py +++ b/file-tracker/file_tracker/client_connection.py @@ -52,7 +52,7 @@ async def on_message(message): try: # DownloadFile uses direct streaming to support large # files and avoid memory issues and WebRTC message size - # limits. Other operations return small data. + # limits if isinstance(operation, DownloadFile): response.message = await operation.execute(channel) else: