Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ OPERANDI_SERVER_DEFAULT_USERNAME=server_operandi
OPERANDI_SERVER_DEFAULT_PASSWORD=server_operandi
OPERANDI_SERVER_URL_LIVE=http://localhost:8000
OPERANDI_SERVER_URL_LOCAL=http://0.0.0.0:8000
SQLITE_DB_DIR=/tmp/sqlite_data
23 changes: 23 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,26 @@ services:
networks:
- operandi
command: operandi-broker start

sqlite:
image: nouchka/sqlite3
container_name: operandi-sqlite
hostname: sqlite-host
volumes:
- "${SQLITE_DB_DIR}/operandi.db:/operandi.db"
networks:
- operandi

grafana:
image: grafana/grafana-enterprise:latest
container_name: grafana
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
environment:
- GF_LOG_LEVEL=debug # Enable debug logs (optional)
restart: always

volumes:
grafana-data:
31 changes: 29 additions & 2 deletions src/server/operandi_server/server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from datetime import datetime
from io import StringIO
from logging import getLogger
from os import environ
from pandas import read_csv
from uvicorn import run

from fastapi import FastAPI, status
from fastapi import FastAPI, status, UploadFile, File, HTTPException

from operandi_utils import get_log_file_path_prefix, reconfigure_all_loggers, verify_database_uri
from operandi_utils import process_trace_dataframe
from operandi_utils.constants import AccountType, LOG_LEVEL_SERVER, OPERANDI_VERSION
from operandi_utils.database import db_initiate_database
from operandi_utils import safe_init_logging
Expand Down Expand Up @@ -65,7 +68,13 @@ def __init__(
status_code=status.HTTP_200_OK,
summary="Get information about the server"
)

self.add_api_route(
path="/upload-trace",
endpoint=self.upload_trace_file,
methods=["POST"],
summary="Upload and process a trace file",
status_code=status.HTTP_201_CREATED
)
def run_server(self):
host, port = self.local_server_url.split("//")[1].split(":")
run(self, host=host, port=int(port))
Expand Down Expand Up @@ -145,3 +154,21 @@ async def insert_default_accounts(self):
details="Default harvester account"
)
self.logger.info(f"Inserted default harvester account credentials")

async def upload_trace_file(self, file: UploadFile = File(...)):
"""
API endpoint to upload a trace file, process it in memory,
and store data in the database without saving the file.
"""
try:
# Read file contents into a pandas DataFrame (without saving)
content = await file.read()
df = read_csv(StringIO(content.decode("utf-8")), sep="\t")

# Process the DataFrame directly
process_trace_dataframe(df)

return {"message": f"File '{file.filename}' uploaded and processed successfully."}

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
50 changes: 50 additions & 0 deletions src/utils/operandi_utils/Grafana_Documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

# Documentation: Parsing Trace Files and Visualizing Data in Grafana

## Overview
This process involves parsing trace files, storing the extracted data in an SQLite3 database, and visualizing the data in Grafana on a localhost setup. Below is an explanation of how it was accomplished.

---

## Step 1: Parsing Trace Files
A custom script was developed to read and process trace files. The script extracted key information from these files and structured it for further use. The primary goal was to transform raw trace file data into a structured format that could be easily stored in a database.

---

## Step 2: Creating an SQLite3 Database
Using the command line, an SQLite3 database was created to store the processed data. A table was defined within the database to organize the data into meaningful fields such as timestamps, values, and descriptions.

---

## Step 3: Inserting Data into the Database
The script inserted the processed data into the SQLite3 database. Each data entry from the trace file was stored as a row in the database, ensuring it was ready for querying and visualization.

---

## Step 4: Setting Up Grafana
Grafana, a popular open-source visualization tool, was installed and set up to work with the database.

1. **Data Source Configuration:**
Grafana was configured to recognize the SQLite3 database as a data source. This involved specifying the path to the database file and setting up the connection within Grafana's settings.

2. **Local Access:**
Grafana was run on localhost, allowing access to the dashboard through a web browser.

---

## Step 5: Creating a Dashboard
1. A dashboard was created in Grafana to display the data visually.
2. Different panels were added to the dashboard, each representing specific data visualizations (e.g., time series, bar charts, or tables).
3. Queries were written within Grafana to fetch the required data from the SQLite3 database and populate the panels.

---

## Step 6: Running the Setup
1. The SQLite3 database was updated with parsed data using the script.
2. Grafana was started locally, and the dashboard was loaded to display the visualized data in real time.
3. The setup allowed seamless monitoring and analysis of the trace file data through an interactive interface.

---

## Outcome
This process provided a complete pipeline for processing raw trace files, storing the data, and creating an intuitive visualization platform. It enabled better insights and data-driven decision-making.
2 changes: 2 additions & 0 deletions src/utils/operandi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"get_nf_wfs_dir",
"get_ocrd_process_wfs_dir",
"make_zip_archive",
"process_trace_dataframe",
"receive_file",
"reconfigure_all_loggers",
"safe_init_logging",
Expand All @@ -21,6 +22,7 @@

from operandi_utils.constants import StateJob, StateJobSlurm, StateWorkspace
from operandi_utils.logging import reconfigure_all_loggers, get_log_file_path_prefix
from operandi_utils.traces_script import process_trace_dataframe
from operandi_utils.utils import (
call_sync,
download_mets_file,
Expand Down
95 changes: 95 additions & 0 deletions src/utils/operandi_utils/traces_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from re import match as re_match, IGNORECASE as re_IGNORECASE
from sqlite3 import connect as sqlite3_connect

SQLITE3_DB_NAME = "workflow_db.db"

def convert_rss_to_gb(rss):
"""
Converts a memory usage string (e.g., '126.9 MB', '2048 KB') to gigabytes as a float.

Parameters:
- peak_rss (str): Memory usage string.

Returns:
- float: Memory usage in gigabytes.
"""
match = re_match(r"([\d\.]+)\s*(KB|MB|GB)", rss, re_IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(2).upper()
if unit == "KB":
return value / 1048576 # (1024 ** 2)
elif unit == "MB":
return value / 1024
elif unit == "GB":
return value
# Return None if the format is invalid
return None


def convert_duration_to_seconds(duration):
"""
Converts a duration string (e.g., '5m 4s', '3.4s') to seconds as a float.

Parameters:
- duration (str): Duration string.

Returns:
- float: Duration in seconds.
"""
match = re_match(r"(?:(\d+)m)?\s*(?:(\d+\.?\d*)s)?", duration)
if match:
minutes = int(match.group(1)) if match.group(1) else 0
seconds = float(match.group(2)) if match.group(2) else 0.0
return minutes * 60 + seconds
# Return None if the format is invalid
return None


def process_trace_dataframe(df):
"""
Processes a dataframe of trace file to retain only enabled metrics and stores data in a database.

Parameters:
- df (dataframe): dataframe of trace file.
- conn (object): Database connection object.
- workflow_id (int): ID of the workflow for data association.
- enabled_metrics (list): List of metrics to be retained.
"""

df["duration"] = df["duration"].apply(convert_duration_to_seconds)
df["%cpu"] = df["%cpu"].str.replace("%", "").astype(float)
df["peak_rss"] = df["peak_rss"].apply(convert_rss_to_gb)

conn = sqlite3_connect(SQLITE3_DB_NAME)
cursor = conn.cursor()

# Insert filtered data into the nextflow_traces table
for _, row in df.iterrows():
cursor.execute("""
INSERT INTO nextflow_traces (name, submit, duration, cpu_percent, peak_rss, workflow_id)
VALUES (?, ?, ?, ?, ?, ?)
""", (row['name'], row['submit'], row['duration'], row['%cpu'], row['peak_rss'], row['workflow_id']))
conn.commit()
conn.close()




def fetch_all_traces_and_print():
conn = sqlite3_connect(SQLITE3_DB_NAME)
cursor = conn.cursor()
# Fetch and print the contents of the nextflow_traces table
cursor.execute("SELECT * FROM nextflow_traces")
rows = cursor.fetchall()

# Get column names from cursor.description
column_names = [description[0] for description in cursor.description]
print(" | ".join(column_names))
for row in rows:
print(" | ".join(map(str, row)))
conn.close()


# process_trace_files()
# fetch_all_traces_and_print()
1 change: 1 addition & 0 deletions src/utils/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ clint==0.5.1
loguru>=0.6.0
httpx>=0.24.0
ocrd>=2.67.0
pandas>=2.0.0
paramiko>=3.4.0
pika>=1.2.0
pydantic>=1.9.1
Expand Down