From d12278b18615c7d5142114a42a662f0d3e31a1f3 Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Wed, 15 Jan 2025 12:01:53 +0100 Subject: [PATCH 01/15] Add SQLite and Grafana configurations to Docker Compose --- docker-compose.yml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index cb93b330..246f3872 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -114,3 +114,28 @@ services: networks: - operandi command: operandi-broker start + + sqlite: + image: nouchka/sqlite3 + container_name: operandi-sqlite + hostname: sqlite-host + volumes: + - "${PWD}/sqlite_data/operandi.db:/operandi.db" # Mount the entire directory to ensure persistence + networks: + - operandi + + + + grafana: + image: grafana/grafana-enterprise:latest + container_name: grafana + ports: + - "3000:3000" + volumes: + - grafana-data:/var/lib/grafana + environment: + - GF_LOG_LEVEL=debug # Enable debug logs (optional) + restart: always + +volumes: + grafana-data: From bf23b14988bbfb435b56660347dbc671e3873fd6 Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Fri, 17 Jan 2025 15:11:45 +0100 Subject: [PATCH 02/15] Added script to parse, process nf-traces files and store into sqlite3 db. --- src/utils/operandi_utils/traces_script.py | 111 ++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 src/utils/operandi_utils/traces_script.py diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py new file mode 100644 index 00000000..e635aa3c --- /dev/null +++ b/src/utils/operandi_utils/traces_script.py @@ -0,0 +1,111 @@ +import yaml +import pandas as pd +import re +import os +import sqlite3 +def convert_rss_to_gb(rss): + """ + Converts a memory usage string (e.g., '126.9 MB', '2048 KB') to gigabytes as a float. + + Parameters: + - peak_rss (str): Memory usage string. + + Returns: + - float: Memory usage in gigabytes. + """ + match = re.match(r"([\d\.]+)\s*(KB|MB|GB)", rss, re.IGNORECASE) + if match: + value = float(match.group(1)) + unit = match.group(2).upper() + if unit == "KB": + return value / (1024**2) # Convert KB to GB + elif unit == "MB": + return value / 1024 # Convert MB to GB + elif unit == "GB": + return value # Already in GB + return None # Return None if the format is invalid +def convert_duration_to_seconds(duration): + """ + Converts a duration string (e.g., '5m 4s', '3.4s') to seconds as a float. + + Parameters: + - duration (str): Duration string. + + Returns: + - float: Duration in seconds. + """ + match = re.match(r"(?:(\d+)m)?\s*(?:(\d+\.?\d*)s)?", duration) + if match: + minutes = int(match.group(1)) if match.group(1) else 0 + seconds = float(match.group(2)) if match.group(2) else 0.0 + return minutes * 60 + seconds + return None # Return None if the format is invalid + + + + +def process_trace_file(file_path): + """ + Processes a trace file to retain only enabled metrics and stores data in a database. + + Parameters: + - file_path (str): Path to the trace file (assumes TSV format). + - conn (object): Database connection object. + - workflow_id (int): ID of the workflow for data association. + - enabled_metrics (list): List of metrics to be retained. + """ + + + # Load the trace data (assuming it's in TSV format) + df = pd.read_csv(file_path, sep="\t") + + df["duration"] = df["duration"].apply(convert_duration_to_seconds) + df["%cpu"] = df["%cpu"].str.replace("%", "").astype(float) + df["peak_rss"] = df["peak_rss"].apply(convert_rss_to_gb) + + + + + + conn = sqlite3.connect('workflow_db.db') + cursor = conn.cursor() + # Insert filtered data into the nextflow_traces table + for _, row in df.iterrows(): + cursor.execute(""" + INSERT INTO nextflow_traces (name, submit, duration, cpu_percent, peak_rss, workflow_id) + VALUES (?, ?, ?, ?, ?, ?) + """, (row['name'], row['submit'], row['duration'], row['%cpu'], row['peak_rss'], row['workflow_id'])) + + # Commit the transaction and close the connection + conn.commit() + + # Close the connection + conn.close() + + + +# Iterate over all files in the folder +for file_name in os.listdir("nf-traces"): + file_path = os.path.join("nf-traces", file_name) + # Check if it's a file (not a directory) + if os.path.isfile(file_path): + process_trace_file(file_path) + + +conn = sqlite3.connect("workflow_db.db") +cursor = conn.cursor() +# Fetch and print the contents of the nextflow_traces table +cursor.execute("SELECT * FROM nextflow_traces") +rows = cursor.fetchall() + +# Get column names from cursor.description +column_names = [description[0] for description in cursor.description] + +# Print column names +print(" | ".join(column_names)) + +# Print the table rows +for row in rows: + print(" | ".join(map(str, row))) +# +conn.close() From d1be608ed55c3d0966b30c55ad6bef1e4cab77c3 Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Fri, 17 Jan 2025 15:24:34 +0100 Subject: [PATCH 03/15] Added documentation for nf-traces to grafana. --- .../operandi_utils/Grafana_Documentation.md | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 src/utils/operandi_utils/Grafana_Documentation.md diff --git a/src/utils/operandi_utils/Grafana_Documentation.md b/src/utils/operandi_utils/Grafana_Documentation.md new file mode 100644 index 00000000..0faed484 --- /dev/null +++ b/src/utils/operandi_utils/Grafana_Documentation.md @@ -0,0 +1,50 @@ + +# Documentation: Parsing Trace Files and Visualizing Data in Grafana + +## Overview +This process involves parsing trace files, storing the extracted data in an SQLite3 database, and visualizing the data in Grafana on a localhost setup. Below is an explanation of how it was accomplished. + +--- + +## Step 1: Parsing Trace Files +A custom script was developed to read and process trace files. The script extracted key information from these files and structured it for further use. The primary goal was to transform raw trace file data into a structured format that could be easily stored in a database. + +--- + +## Step 2: Creating an SQLite3 Database +Using the command line, an SQLite3 database was created to store the processed data. A table was defined within the database to organize the data into meaningful fields such as timestamps, values, and descriptions. + +--- + +## Step 3: Inserting Data into the Database +The script inserted the processed data into the SQLite3 database. Each data entry from the trace file was stored as a row in the database, ensuring it was ready for querying and visualization. + +--- + +## Step 4: Setting Up Grafana +Grafana, a popular open-source visualization tool, was installed and set up to work with the database. + +1. **Data Source Configuration:** + Grafana was configured to recognize the SQLite3 database as a data source. This involved specifying the path to the database file and setting up the connection within Grafana's settings. + +2. **Local Access:** + Grafana was run on localhost, allowing access to the dashboard through a web browser. + +--- + +## Step 5: Creating a Dashboard +1. A dashboard was created in Grafana to display the data visually. +2. Different panels were added to the dashboard, each representing specific data visualizations (e.g., time series, bar charts, or tables). +3. Queries were written within Grafana to fetch the required data from the SQLite3 database and populate the panels. + +--- + +## Step 6: Running the Setup +1. The SQLite3 database was updated with parsed data using the script. +2. Grafana was started locally, and the dashboard was loaded to display the visualized data in real time. +3. The setup allowed seamless monitoring and analysis of the trace file data through an interactive interface. + +--- + +## Outcome +This process provided a complete pipeline for processing raw trace files, storing the data, and creating an intuitive visualization platform. It enabled better insights and data-driven decision-making. From 5d526484f47039913b7b96e208ff27ec8ca4d439 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 21 Jan 2025 15:43:12 +0100 Subject: [PATCH 04/15] add pandas to requirements.txt --- src/utils/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/requirements.txt b/src/utils/requirements.txt index 47c8f55a..d31544f7 100644 --- a/src/utils/requirements.txt +++ b/src/utils/requirements.txt @@ -6,6 +6,7 @@ clint==0.5.1 loguru>=0.6.0 httpx>=0.24.0 ocrd>=2.67.0 +pandas>=2.2.3 paramiko>=3.4.0 pika>=1.2.0 pydantic>=1.9.1 From a803a3cad387316e7587405109e1287335770e06 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 21 Jan 2025 15:46:07 +0100 Subject: [PATCH 05/15] reduce pandas version to match older python versions --- src/utils/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/requirements.txt b/src/utils/requirements.txt index d31544f7..db6fc2d6 100644 --- a/src/utils/requirements.txt +++ b/src/utils/requirements.txt @@ -6,7 +6,7 @@ clint==0.5.1 loguru>=0.6.0 httpx>=0.24.0 ocrd>=2.67.0 -pandas>=2.2.3 +pandas>=2.0.0 paramiko>=3.4.0 pika>=1.2.0 pydantic>=1.9.1 From d8a82d636574ccf0b2b10e16d2474c17f16b023e Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 21 Jan 2025 16:17:25 +0100 Subject: [PATCH 06/15] refactor: spacing and imports --- docker-compose.yml | 2 - src/utils/operandi_utils/traces_script.py | 54 ++++++++++------------- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 246f3872..2d3ae110 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -124,8 +124,6 @@ services: networks: - operandi - - grafana: image: grafana/grafana-enterprise:latest container_name: grafana diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py index e635aa3c..685f4d66 100644 --- a/src/utils/operandi_utils/traces_script.py +++ b/src/utils/operandi_utils/traces_script.py @@ -1,8 +1,10 @@ -import yaml import pandas as pd -import re -import os -import sqlite3 +from re import match as re_match, IGNORECASE as re_IGNORECASE +from os import listdir +from os.path import join, isfile +from sqlite3 import connect as sqlite3_connect + + def convert_rss_to_gb(rss): """ Converts a memory usage string (e.g., '126.9 MB', '2048 KB') to gigabytes as a float. @@ -13,17 +15,20 @@ def convert_rss_to_gb(rss): Returns: - float: Memory usage in gigabytes. """ - match = re.match(r"([\d\.]+)\s*(KB|MB|GB)", rss, re.IGNORECASE) + match = re_match(r"([\d\.]+)\s*(KB|MB|GB)", rss, re_IGNORECASE) if match: value = float(match.group(1)) unit = match.group(2).upper() if unit == "KB": - return value / (1024**2) # Convert KB to GB + return value / 1048576 # (1024 ** 2) elif unit == "MB": - return value / 1024 # Convert MB to GB + return value / 1024 elif unit == "GB": - return value # Already in GB - return None # Return None if the format is invalid + return value + # Return None if the format is invalid + return None + + def convert_duration_to_seconds(duration): """ Converts a duration string (e.g., '5m 4s', '3.4s') to seconds as a float. @@ -34,14 +39,13 @@ def convert_duration_to_seconds(duration): Returns: - float: Duration in seconds. """ - match = re.match(r"(?:(\d+)m)?\s*(?:(\d+\.?\d*)s)?", duration) + match = re_match(r"(?:(\d+)m)?\s*(?:(\d+\.?\d*)s)?", duration) if match: minutes = int(match.group(1)) if match.group(1) else 0 seconds = float(match.group(2)) if match.group(2) else 0.0 return minutes * 60 + seconds - return None # Return None if the format is invalid - - + # Return None if the format is invalid + return None def process_trace_file(file_path): @@ -55,44 +59,32 @@ def process_trace_file(file_path): - enabled_metrics (list): List of metrics to be retained. """ - # Load the trace data (assuming it's in TSV format) df = pd.read_csv(file_path, sep="\t") - df["duration"] = df["duration"].apply(convert_duration_to_seconds) df["%cpu"] = df["%cpu"].str.replace("%", "").astype(float) df["peak_rss"] = df["peak_rss"].apply(convert_rss_to_gb) - - - - - conn = sqlite3.connect('workflow_db.db') + conn = sqlite3_connect('workflow_db.db') cursor = conn.cursor() + # Insert filtered data into the nextflow_traces table for _, row in df.iterrows(): cursor.execute(""" INSERT INTO nextflow_traces (name, submit, duration, cpu_percent, peak_rss, workflow_id) VALUES (?, ?, ?, ?, ?, ?) """, (row['name'], row['submit'], row['duration'], row['%cpu'], row['peak_rss'], row['workflow_id'])) - - # Commit the transaction and close the connection conn.commit() - - # Close the connection conn.close() - -# Iterate over all files in the folder -for file_name in os.listdir("nf-traces"): - file_path = os.path.join("nf-traces", file_name) - # Check if it's a file (not a directory) - if os.path.isfile(file_path): +for file_name in listdir("nf-traces"): + file_path = join("nf-traces", file_name) + if isfile(file_path): process_trace_file(file_path) -conn = sqlite3.connect("workflow_db.db") +conn = sqlite3_connect("workflow_db.db") cursor = conn.cursor() # Fetch and print the contents of the nextflow_traces table cursor.execute("SELECT * FROM nextflow_traces") From 453bb218a50e4c843faa5fd441badb4f9a90e80c Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 21 Jan 2025 16:22:38 +0100 Subject: [PATCH 07/15] use DB name constant --- src/utils/operandi_utils/traces_script.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py index 685f4d66..f041bb94 100644 --- a/src/utils/operandi_utils/traces_script.py +++ b/src/utils/operandi_utils/traces_script.py @@ -4,6 +4,7 @@ from os.path import join, isfile from sqlite3 import connect as sqlite3_connect +SQLITE3_DB_NAME = "workflow_db.db" def convert_rss_to_gb(rss): """ @@ -65,7 +66,7 @@ def process_trace_file(file_path): df["%cpu"] = df["%cpu"].str.replace("%", "").astype(float) df["peak_rss"] = df["peak_rss"].apply(convert_rss_to_gb) - conn = sqlite3_connect('workflow_db.db') + conn = sqlite3_connect(SQLITE3_DB_NAME) cursor = conn.cursor() # Insert filtered data into the nextflow_traces table @@ -84,7 +85,7 @@ def process_trace_file(file_path): process_trace_file(file_path) -conn = sqlite3_connect("workflow_db.db") +conn = sqlite3_connect(SQLITE3_DB_NAME) cursor = conn.cursor() # Fetch and print the contents of the nextflow_traces table cursor.execute("SELECT * FROM nextflow_traces") From d25240ff725fcc91b42b7d4014f64618c92587db Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 21 Jan 2025 16:24:04 +0100 Subject: [PATCH 08/15] fix: outside scope shadowing of file_path --- src/utils/operandi_utils/traces_script.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py index f041bb94..b872d588 100644 --- a/src/utils/operandi_utils/traces_script.py +++ b/src/utils/operandi_utils/traces_script.py @@ -79,10 +79,13 @@ def process_trace_file(file_path): conn.close() -for file_name in listdir("nf-traces"): - file_path = join("nf-traces", file_name) - if isfile(file_path): - process_trace_file(file_path) +def process_trace_files(): + for file_name in listdir("nf-traces"): + file_path = join("nf-traces", file_name) + if isfile(file_path): + process_trace_file(file_path) + +process_trace_files() conn = sqlite3_connect(SQLITE3_DB_NAME) From fe17969612da0429faf066768650f3c7f8f2f81a Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 21 Jan 2025 16:27:42 +0100 Subject: [PATCH 09/15] fix: outside scope shadowing of sqlite3 variables --- src/utils/operandi_utils/traces_script.py | 32 +++++++++++------------ 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py index b872d588..5155744a 100644 --- a/src/utils/operandi_utils/traces_script.py +++ b/src/utils/operandi_utils/traces_script.py @@ -85,23 +85,21 @@ def process_trace_files(): if isfile(file_path): process_trace_file(file_path) -process_trace_files() - -conn = sqlite3_connect(SQLITE3_DB_NAME) -cursor = conn.cursor() -# Fetch and print the contents of the nextflow_traces table -cursor.execute("SELECT * FROM nextflow_traces") -rows = cursor.fetchall() - -# Get column names from cursor.description -column_names = [description[0] for description in cursor.description] +def fetch_all_traces_and_print(): + conn = sqlite3_connect(SQLITE3_DB_NAME) + cursor = conn.cursor() + # Fetch and print the contents of the nextflow_traces table + cursor.execute("SELECT * FROM nextflow_traces") + rows = cursor.fetchall() + + # Get column names from cursor.description + column_names = [description[0] for description in cursor.description] + print(" | ".join(column_names)) + for row in rows: + print(" | ".join(map(str, row))) + conn.close() -# Print column names -print(" | ".join(column_names)) -# Print the table rows -for row in rows: - print(" | ".join(map(str, row))) -# -conn.close() +process_trace_files() +fetch_all_traces_and_print() From 772df886137afb3540bbe869a533e8945c5384f1 Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Wed, 5 Feb 2025 13:01:16 +0100 Subject: [PATCH 10/15] Instead of passing file changing to pass dataframe in script. --- src/utils/operandi_utils/traces_script.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py index 5155744a..be791fc3 100644 --- a/src/utils/operandi_utils/traces_script.py +++ b/src/utils/operandi_utils/traces_script.py @@ -49,19 +49,17 @@ def convert_duration_to_seconds(duration): return None -def process_trace_file(file_path): +def process_trace_file(df): """ Processes a trace file to retain only enabled metrics and stores data in a database. Parameters: - - file_path (str): Path to the trace file (assumes TSV format). + - df (dataframe): dataframe of file. - conn (object): Database connection object. - workflow_id (int): ID of the workflow for data association. - enabled_metrics (list): List of metrics to be retained. """ - # Load the trace data (assuming it's in TSV format) - df = pd.read_csv(file_path, sep="\t") df["duration"] = df["duration"].apply(convert_duration_to_seconds) df["%cpu"] = df["%cpu"].str.replace("%", "").astype(float) df["peak_rss"] = df["peak_rss"].apply(convert_rss_to_gb) @@ -101,5 +99,5 @@ def fetch_all_traces_and_print(): conn.close() -process_trace_files() -fetch_all_traces_and_print() +# process_trace_files() +# fetch_all_traces_and_print() From 59bce06fe8ea24a58ff7bfb70bb2f2323c252e82 Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Wed, 5 Feb 2025 13:11:03 +0100 Subject: [PATCH 11/15] Import funcions from utils.traces script --- src/utils/operandi_utils/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/utils/operandi_utils/__init__.py b/src/utils/operandi_utils/__init__.py index 0fd74c2f..8bcd1b59 100644 --- a/src/utils/operandi_utils/__init__.py +++ b/src/utils/operandi_utils/__init__.py @@ -7,6 +7,7 @@ "get_nf_wfs_dir", "get_ocrd_process_wfs_dir", "make_zip_archive", + "process_trace_file", "receive_file", "reconfigure_all_loggers", "safe_init_logging", @@ -21,6 +22,7 @@ from operandi_utils.constants import StateJob, StateJobSlurm, StateWorkspace from operandi_utils.logging import reconfigure_all_loggers, get_log_file_path_prefix +from operandi_utils.traces_script import process_trace_file from operandi_utils.utils import ( call_sync, download_mets_file, From 3b285137dfd63fbf093468f700f85f16f484a19b Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Wed, 5 Feb 2025 13:15:53 +0100 Subject: [PATCH 12/15] Change script to process df. --- src/utils/operandi_utils/traces_script.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py index be791fc3..2a4c29e3 100644 --- a/src/utils/operandi_utils/traces_script.py +++ b/src/utils/operandi_utils/traces_script.py @@ -1,7 +1,4 @@ -import pandas as pd from re import match as re_match, IGNORECASE as re_IGNORECASE -from os import listdir -from os.path import join, isfile from sqlite3 import connect as sqlite3_connect SQLITE3_DB_NAME = "workflow_db.db" @@ -49,12 +46,12 @@ def convert_duration_to_seconds(duration): return None -def process_trace_file(df): +def process_trace_dataframe(df): """ - Processes a trace file to retain only enabled metrics and stores data in a database. + Processes a dataframe of trace file to retain only enabled metrics and stores data in a database. Parameters: - - df (dataframe): dataframe of file. + - df (dataframe): dataframe of trace file. - conn (object): Database connection object. - workflow_id (int): ID of the workflow for data association. - enabled_metrics (list): List of metrics to be retained. @@ -77,11 +74,6 @@ def process_trace_file(df): conn.close() -def process_trace_files(): - for file_name in listdir("nf-traces"): - file_path = join("nf-traces", file_name) - if isfile(file_path): - process_trace_file(file_path) def fetch_all_traces_and_print(): From a8a70b387c9e5694373d2cec2fd49de464ff4530 Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Wed, 5 Feb 2025 13:17:24 +0100 Subject: [PATCH 13/15] Changing names of import. --- src/utils/operandi_utils/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils/operandi_utils/__init__.py b/src/utils/operandi_utils/__init__.py index 8bcd1b59..601de138 100644 --- a/src/utils/operandi_utils/__init__.py +++ b/src/utils/operandi_utils/__init__.py @@ -7,7 +7,7 @@ "get_nf_wfs_dir", "get_ocrd_process_wfs_dir", "make_zip_archive", - "process_trace_file", + "process_trace_dataframe", "receive_file", "reconfigure_all_loggers", "safe_init_logging", @@ -22,7 +22,7 @@ from operandi_utils.constants import StateJob, StateJobSlurm, StateWorkspace from operandi_utils.logging import reconfigure_all_loggers, get_log_file_path_prefix -from operandi_utils.traces_script import process_trace_file +from operandi_utils.traces_script import process_trace_dataframe from operandi_utils.utils import ( call_sync, download_mets_file, From 29af363d2b08bb11b746eba71ae10e06b6ca733d Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Sun, 16 Feb 2025 22:01:04 +0100 Subject: [PATCH 14/15] Endpoint to add trace file and process it. --- src/server/operandi_server/server.py | 31 ++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/server/operandi_server/server.py b/src/server/operandi_server/server.py index e0f0b817..959da8fe 100644 --- a/src/server/operandi_server/server.py +++ b/src/server/operandi_server/server.py @@ -1,11 +1,14 @@ from datetime import datetime +from io import StringIO from logging import getLogger from os import environ +from pandas import read_csv from uvicorn import run -from fastapi import FastAPI, status +from fastapi import FastAPI, status, UploadFile, File, HTTPException from operandi_utils import get_log_file_path_prefix, reconfigure_all_loggers, verify_database_uri +from operandi_utils import process_trace_dataframe from operandi_utils.constants import AccountType, LOG_LEVEL_SERVER, OPERANDI_VERSION from operandi_utils.database import db_initiate_database from operandi_utils import safe_init_logging @@ -65,7 +68,13 @@ def __init__( status_code=status.HTTP_200_OK, summary="Get information about the server" ) - + self.add_api_route( + path="/upload-trace", + endpoint=self.upload_trace_file, + methods=["POST"], + summary="Upload and process a trace file", + status_code=status.HTTP_201_CREATED + ) def run_server(self): host, port = self.local_server_url.split("//")[1].split(":") run(self, host=host, port=int(port)) @@ -145,3 +154,21 @@ async def insert_default_accounts(self): details="Default harvester account" ) self.logger.info(f"Inserted default harvester account credentials") + + async def upload_trace_file(self, file: UploadFile = File(...)): + """ + API endpoint to upload a trace file, process it in memory, + and store data in the database without saving the file. + """ + try: + # Read file contents into a pandas DataFrame (without saving) + content = await file.read() + df = read_csv(StringIO(content.decode("utf-8")), sep="\t") + + # Process the DataFrame directly + process_trace_dataframe(df) + + return {"message": f"File '{file.filename}' uploaded and processed successfully."} + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) From a9a08a4a74fd174c3ca43668746c5e592368502a Mon Sep 17 00:00:00 2001 From: Faizan Riasat Date: Sun, 16 Feb 2025 22:15:30 +0100 Subject: [PATCH 15/15] Add env variable in sqlite volume dockerfile. --- .env | 1 + docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.env b/.env index 03233af3..f16dc620 100644 --- a/.env +++ b/.env @@ -15,3 +15,4 @@ OPERANDI_SERVER_DEFAULT_USERNAME=server_operandi OPERANDI_SERVER_DEFAULT_PASSWORD=server_operandi OPERANDI_SERVER_URL_LIVE=http://localhost:8000 OPERANDI_SERVER_URL_LOCAL=http://0.0.0.0:8000 +SQLITE_DB_DIR=/tmp/sqlite_data \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2d3ae110..0b1c571a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -120,7 +120,7 @@ services: container_name: operandi-sqlite hostname: sqlite-host volumes: - - "${PWD}/sqlite_data/operandi.db:/operandi.db" # Mount the entire directory to ensure persistence + - "${SQLITE_DB_DIR}/operandi.db:/operandi.db" networks: - operandi