Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions file-tracker/file_tracker/client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
RTCPeerConnection,
RTCSessionDescription,
)
from file_operations import Operation, OperationError
from file_operations import DownloadFile, Operation, OperationError
from operation_response import OperationResponse, OperationStatus


Expand Down Expand Up @@ -50,15 +50,22 @@ async def on_message(message):
operation.path = self.path
while not channel_closed.is_set():
try:
response.message = await operation.execute()
# DownloadFile uses direct streaming to support large
# files and avoid memory issues and WebRTC message size
# limits
if isinstance(operation, DownloadFile):
response.message = await operation.execute(channel)
else:
response.message = await operation.execute()
except OperationError as e:
response = OperationResponse(
status=OperationStatus.ERROR, message=str(e))
finally:
if response.message is not None:
channel.send(response.to_json_string())

if message.get("follow", False):
follow_mode = message.get("follow", False)
if follow_mode and not isinstance(operation, DownloadFile):
await asyncio.sleep(1)
else:
break
Expand Down
124 changes: 120 additions & 4 deletions file-tracker/file_tracker/file_operations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import abc
import asyncio
import base64
import json
import os
import subprocess
import time
Expand All @@ -17,12 +19,17 @@ class Operation:

@classmethod
def from_request(cls, request):
operation = cls._get_class(request["type"])
return operation(**request["args"])
operation_type = request["type"]
operation = cls._get_class(operation_type)
args = request.get("args", {})

if "follow" in request and operation == DownloadFile:
args["follow"] = request["follow"]

return operation(**args)

@classmethod
def _get_class(cls, type):

if type in cls.SUPPORTED_OPERATIONS:
return cls.SUPPORTED_OPERATIONS[type]
else:
Expand Down Expand Up @@ -73,6 +80,7 @@ def top(self) -> str:

LAST_MODIFIED_FILE_PATH_SUFFIX = "output/artifacts/output_update.csv"
METRICS_FILE_PATH_SUFFIX = "output/artifacts/system_metrics.csv"
DEFAULT_CHUNK_SIZE = 64 * 1024 # 64KB


class LastModifiedFile(Operation):
Expand Down Expand Up @@ -204,10 +212,118 @@ def get_appended(self, path_to_file, filename):
return content.split('\n')


class DownloadFile(Operation):

def __init__(self, file_path, chunk_size=DEFAULT_CHUNK_SIZE, follow=False):
self.file_path = file_path
self.filename = os.path.basename(file_path)
self.chunk_size = chunk_size
self.follow = follow
self.path = None

async def execute(self, channel):
try:
full_path = os.path.join(self.path, self.file_path)
if not os.path.exists(full_path):
raise OperationError(f"File not found: {self.file_path}")

if self.follow:
# Real-time streaming mode - stream file as it's being written
await self._stream_realtime_file(channel, full_path)
else:
# Standard download mode - download existing file
await self._stream_static_file(channel, full_path)

except Exception as e: # noqa: BLE001
self._send_error(channel, str(e))

async def _stream_static_file(self, channel, file_path):
file_size = os.path.getsize(file_path)
self._send_file_metadata(channel, file_size)

chunk_count = 0
with open(file_path, 'rb') as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break

chunk_count += 1
self._send_chunk(channel, chunk, chunk_count)

self._send_complete(channel, chunk_count, file_size)

async def _stream_realtime_file(self, channel, file_path, interval=1):
self._send_file_metadata(channel, None)

chunk_count = 0
total_size = 0

# Stream file content continuously
with open(file_path, 'rb') as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
# No more data, wait a bit
await asyncio.sleep(interval)
continue

chunk_count += 1
total_size += len(chunk)
self._send_chunk(channel, chunk, chunk_count)

def _send_error(self, channel, error_message):
message = {
"status": "error",
"message": {
"type": "download_error",
"error": error_message
}
}
channel.send(json.dumps(message))

def _send_file_metadata(self, channel, total_size):
message = {
"status": "success",
"message": {
"type": "download_info",
"filename": self.filename,
"total_size": total_size
}
}
channel.send(json.dumps(message))

def _send_chunk(self, channel, chunk, chunk_number):
chunk_b64 = base64.b64encode(chunk).decode('utf-8')
message = {
"status": "success",
"message": {
"type": "download_chunk",
"chunk_number": chunk_number,
"chunk_size": len(chunk),
"data": chunk_b64
}
}
channel.send(json.dumps(message))

def _send_complete(self, channel, chunk_count, total_size):
message = {
"status": "success",
"message": {
"type": "download_complete",
"filename": self.filename,
"total_chunks": chunk_count,
"total_size": total_size
}
}
channel.send(json.dumps(message))


# Initialize SUPPORTED_OPERATIONS after defining all classes
Operation.SUPPORTED_OPERATIONS = {
"ls": List,
"tail": Tail,
"top": Top,
"last_modified_file": LastModifiedFile
"last_modified_file": LastModifiedFile,
"download_file": DownloadFile
}