diff --git a/.env b/.env index 03233af3..f16dc620 100644 --- a/.env +++ b/.env @@ -15,3 +15,4 @@ OPERANDI_SERVER_DEFAULT_USERNAME=server_operandi OPERANDI_SERVER_DEFAULT_PASSWORD=server_operandi OPERANDI_SERVER_URL_LIVE=http://localhost:8000 OPERANDI_SERVER_URL_LOCAL=http://0.0.0.0:8000 +SQLITE_DB_DIR=/tmp/sqlite_data \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index cb93b330..0b1c571a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -114,3 +114,26 @@ services: networks: - operandi command: operandi-broker start + + sqlite: + image: nouchka/sqlite3 + container_name: operandi-sqlite + hostname: sqlite-host + volumes: + - "${SQLITE_DB_DIR}/operandi.db:/operandi.db" + networks: + - operandi + + grafana: + image: grafana/grafana-enterprise:latest + container_name: grafana + ports: + - "3000:3000" + volumes: + - grafana-data:/var/lib/grafana + environment: + - GF_LOG_LEVEL=debug # Enable debug logs (optional) + restart: always + +volumes: + grafana-data: diff --git a/src/server/operandi_server/server.py b/src/server/operandi_server/server.py index e0f0b817..959da8fe 100644 --- a/src/server/operandi_server/server.py +++ b/src/server/operandi_server/server.py @@ -1,11 +1,14 @@ from datetime import datetime +from io import StringIO from logging import getLogger from os import environ +from pandas import read_csv from uvicorn import run -from fastapi import FastAPI, status +from fastapi import FastAPI, status, UploadFile, File, HTTPException from operandi_utils import get_log_file_path_prefix, reconfigure_all_loggers, verify_database_uri +from operandi_utils import process_trace_dataframe from operandi_utils.constants import AccountType, LOG_LEVEL_SERVER, OPERANDI_VERSION from operandi_utils.database import db_initiate_database from operandi_utils import safe_init_logging @@ -65,7 +68,13 @@ def __init__( status_code=status.HTTP_200_OK, summary="Get information about the server" ) - + self.add_api_route( + path="/upload-trace", + endpoint=self.upload_trace_file, + methods=["POST"], + summary="Upload and process a trace file", + status_code=status.HTTP_201_CREATED + ) def run_server(self): host, port = self.local_server_url.split("//")[1].split(":") run(self, host=host, port=int(port)) @@ -145,3 +154,21 @@ async def insert_default_accounts(self): details="Default harvester account" ) self.logger.info(f"Inserted default harvester account credentials") + + async def upload_trace_file(self, file: UploadFile = File(...)): + """ + API endpoint to upload a trace file, process it in memory, + and store data in the database without saving the file. + """ + try: + # Read file contents into a pandas DataFrame (without saving) + content = await file.read() + df = read_csv(StringIO(content.decode("utf-8")), sep="\t") + + # Process the DataFrame directly + process_trace_dataframe(df) + + return {"message": f"File '{file.filename}' uploaded and processed successfully."} + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/utils/operandi_utils/Grafana_Documentation.md b/src/utils/operandi_utils/Grafana_Documentation.md new file mode 100644 index 00000000..0faed484 --- /dev/null +++ b/src/utils/operandi_utils/Grafana_Documentation.md @@ -0,0 +1,50 @@ + +# Documentation: Parsing Trace Files and Visualizing Data in Grafana + +## Overview +This process involves parsing trace files, storing the extracted data in an SQLite3 database, and visualizing the data in Grafana on a localhost setup. Below is an explanation of how it was accomplished. + +--- + +## Step 1: Parsing Trace Files +A custom script was developed to read and process trace files. The script extracted key information from these files and structured it for further use. The primary goal was to transform raw trace file data into a structured format that could be easily stored in a database. + +--- + +## Step 2: Creating an SQLite3 Database +Using the command line, an SQLite3 database was created to store the processed data. A table was defined within the database to organize the data into meaningful fields such as timestamps, values, and descriptions. + +--- + +## Step 3: Inserting Data into the Database +The script inserted the processed data into the SQLite3 database. Each data entry from the trace file was stored as a row in the database, ensuring it was ready for querying and visualization. + +--- + +## Step 4: Setting Up Grafana +Grafana, a popular open-source visualization tool, was installed and set up to work with the database. + +1. **Data Source Configuration:** + Grafana was configured to recognize the SQLite3 database as a data source. This involved specifying the path to the database file and setting up the connection within Grafana's settings. + +2. **Local Access:** + Grafana was run on localhost, allowing access to the dashboard through a web browser. + +--- + +## Step 5: Creating a Dashboard +1. A dashboard was created in Grafana to display the data visually. +2. Different panels were added to the dashboard, each representing specific data visualizations (e.g., time series, bar charts, or tables). +3. Queries were written within Grafana to fetch the required data from the SQLite3 database and populate the panels. + +--- + +## Step 6: Running the Setup +1. The SQLite3 database was updated with parsed data using the script. +2. Grafana was started locally, and the dashboard was loaded to display the visualized data in real time. +3. The setup allowed seamless monitoring and analysis of the trace file data through an interactive interface. + +--- + +## Outcome +This process provided a complete pipeline for processing raw trace files, storing the data, and creating an intuitive visualization platform. It enabled better insights and data-driven decision-making. diff --git a/src/utils/operandi_utils/__init__.py b/src/utils/operandi_utils/__init__.py index 0fd74c2f..601de138 100644 --- a/src/utils/operandi_utils/__init__.py +++ b/src/utils/operandi_utils/__init__.py @@ -7,6 +7,7 @@ "get_nf_wfs_dir", "get_ocrd_process_wfs_dir", "make_zip_archive", + "process_trace_dataframe", "receive_file", "reconfigure_all_loggers", "safe_init_logging", @@ -21,6 +22,7 @@ from operandi_utils.constants import StateJob, StateJobSlurm, StateWorkspace from operandi_utils.logging import reconfigure_all_loggers, get_log_file_path_prefix +from operandi_utils.traces_script import process_trace_dataframe from operandi_utils.utils import ( call_sync, download_mets_file, diff --git a/src/utils/operandi_utils/traces_script.py b/src/utils/operandi_utils/traces_script.py new file mode 100644 index 00000000..2a4c29e3 --- /dev/null +++ b/src/utils/operandi_utils/traces_script.py @@ -0,0 +1,95 @@ +from re import match as re_match, IGNORECASE as re_IGNORECASE +from sqlite3 import connect as sqlite3_connect + +SQLITE3_DB_NAME = "workflow_db.db" + +def convert_rss_to_gb(rss): + """ + Converts a memory usage string (e.g., '126.9 MB', '2048 KB') to gigabytes as a float. + + Parameters: + - peak_rss (str): Memory usage string. + + Returns: + - float: Memory usage in gigabytes. + """ + match = re_match(r"([\d\.]+)\s*(KB|MB|GB)", rss, re_IGNORECASE) + if match: + value = float(match.group(1)) + unit = match.group(2).upper() + if unit == "KB": + return value / 1048576 # (1024 ** 2) + elif unit == "MB": + return value / 1024 + elif unit == "GB": + return value + # Return None if the format is invalid + return None + + +def convert_duration_to_seconds(duration): + """ + Converts a duration string (e.g., '5m 4s', '3.4s') to seconds as a float. + + Parameters: + - duration (str): Duration string. + + Returns: + - float: Duration in seconds. + """ + match = re_match(r"(?:(\d+)m)?\s*(?:(\d+\.?\d*)s)?", duration) + if match: + minutes = int(match.group(1)) if match.group(1) else 0 + seconds = float(match.group(2)) if match.group(2) else 0.0 + return minutes * 60 + seconds + # Return None if the format is invalid + return None + + +def process_trace_dataframe(df): + """ + Processes a dataframe of trace file to retain only enabled metrics and stores data in a database. + + Parameters: + - df (dataframe): dataframe of trace file. + - conn (object): Database connection object. + - workflow_id (int): ID of the workflow for data association. + - enabled_metrics (list): List of metrics to be retained. + """ + + df["duration"] = df["duration"].apply(convert_duration_to_seconds) + df["%cpu"] = df["%cpu"].str.replace("%", "").astype(float) + df["peak_rss"] = df["peak_rss"].apply(convert_rss_to_gb) + + conn = sqlite3_connect(SQLITE3_DB_NAME) + cursor = conn.cursor() + + # Insert filtered data into the nextflow_traces table + for _, row in df.iterrows(): + cursor.execute(""" + INSERT INTO nextflow_traces (name, submit, duration, cpu_percent, peak_rss, workflow_id) + VALUES (?, ?, ?, ?, ?, ?) + """, (row['name'], row['submit'], row['duration'], row['%cpu'], row['peak_rss'], row['workflow_id'])) + conn.commit() + conn.close() + + + + +def fetch_all_traces_and_print(): + conn = sqlite3_connect(SQLITE3_DB_NAME) + cursor = conn.cursor() + # Fetch and print the contents of the nextflow_traces table + cursor.execute("SELECT * FROM nextflow_traces") + rows = cursor.fetchall() + + # Get column names from cursor.description + column_names = [description[0] for description in cursor.description] + print(" | ".join(column_names)) + for row in rows: + print(" | ".join(map(str, row))) + conn.close() + + +# process_trace_files() +# fetch_all_traces_and_print() diff --git a/src/utils/requirements.txt b/src/utils/requirements.txt index 47c8f55a..db6fc2d6 100644 --- a/src/utils/requirements.txt +++ b/src/utils/requirements.txt @@ -6,6 +6,7 @@ clint==0.5.1 loguru>=0.6.0 httpx>=0.24.0 ocrd>=2.67.0 +pandas>=2.0.0 paramiko>=3.4.0 pika>=1.2.0 pydantic>=1.9.1