Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
34c7452
feat: implement and apply new Logger class (wip)
antidodo Aug 7, 2025
b02afe1
test: add unit tests for DataApiClient and MessageBrokerAPI
antidodo Aug 7, 2025
57df1bc
refactor: replace list with queue for log management in Logger class
antidodo Aug 11, 2025
f471ddb
test: add unit tests for FlameLogger and MessageBrokerAPI
antidodo Aug 11, 2025
9e358a0
feat: improve error handling in API clients and logging
antidodo Aug 11, 2025
90ac846
refactor: update logging methods to use POAPI instead of POClient
antidodo Aug 28, 2025
cbfb60a
fix: enhance logging and error handling in POClient and utils
antidodo Aug 28, 2025
a625b4b
fix: enhance logging and error handling in POClient and utils
antidodo Aug 28, 2025
71d57f6
fix: increase timeout for log streaming in POClient
antidodo Aug 28, 2025
c301c6a
fix: change log streaming request to use JSON format
antidodo Aug 29, 2025
bb31b23
fix: change log streaming request method from PUT to POST
antidodo Sep 18, 2025
0dcdee5
fix: correct log type handling in log submission
antidodo Sep 22, 2025
47d5251
fix: implement log queueing for suppressing log output
antidodo Sep 22, 2025
aaf5505
fix: update log submission method to include type hints and increase …
antidodo Sep 23, 2025
0467480
fix: update log submission method to include type hints and increase …
antidodo Sep 23, 2025
8e4e8a1
fix: add logging for queue status during log submission
antidodo Sep 23, 2025
a037b65
fix: enhance logging for log queue processing and update DataAPI init…
antidodo Sep 23, 2025
0aab278
fix: remove unused node_id from POAPI initialization and log streamin…
antidodo Sep 23, 2025
24a5382
fix: add check for empty log queue before sending to POAPI
antidodo Sep 23, 2025
c69a9eb
fix: streamline log sending process by removing empty queue check and…
antidodo Sep 24, 2025
cf4dda3
fix: refactor log sending process by removing unused methods and upda…
antidodo Sep 24, 2025
8e78a09
fix: statically add logger to message init
Nightknight3000 Sep 25, 2025
4d3b92d
fix: add flame_logger to Message initialization and handle connection…
antidodo Sep 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 120 additions & 114 deletions flamesdk/flame_core.py

Large diffs are not rendered by default.

44 changes: 32 additions & 12 deletions flamesdk/resources/client_apis/clients/data_api_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Any, Optional, Union
from typing import Optional, Union
import asyncio
from httpx import AsyncClient
from httpx import AsyncClient, HTTPStatusError
import re
from flamesdk.resources.utils.logging import FlameLogger


class DataApiClient:
def __init__(self, project_id: str, nginx_name: str, data_source_token: str, keycloak_token: str) -> None:
def __init__(self, project_id: str, nginx_name: str, data_source_token: str, keycloak_token: str, flame_logger: FlameLogger) -> None:
self.nginx_name = nginx_name
self.flame_logger = flame_logger
self.client = AsyncClient(base_url=f"http://{nginx_name}/kong",
headers={"apikey": data_source_token,
"Content-Type": "application/json"},
Expand All @@ -26,38 +29,55 @@ def refresh_token(self, keycloak_token: str):

async def _retrieve_available_sources(self) -> list[dict[str, str]]:
response = await self.hub_client.get(f"/kong/datastore/{self.project_id}")
response.raise_for_status()
try:
response.raise_for_status()
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Failed to retrieve available data sources for project {self.project_id}:"
f" {repr(e)}")

return [{'name': source['name']} for source in response.json()['data']]

def get_available_sources(self):
return self.available_sources

def get_data(self, s3_keys: Optional[list[str]] = None, fhir_queries: Optional[list[str]] = None) \
-> list[Union[dict[str, Union[dict, str]], str]]:
def get_data(self,
s3_keys: Optional[list[str]] = None,
fhir_queries: Optional[list[str]] = None) -> list[Union[dict[str, Union[dict, str]], str]]:
dataset_sources = []
for source in self.available_sources:
datasets = {}
if fhir_queries is not None:
for fhir_query in fhir_queries: # premise: retrieves data for each fhir_query from each data source
response = asyncio.run(self.client.get(f"{source['name']}/fhir/{fhir_query}",
headers=[('Connection', 'close')]))
response.raise_for_status()
try:
response.raise_for_status()
except HTTPStatusError as e:
self.flame_logger.new_log(f"Failed to retrieve fhir data for query {fhir_query} "
f"from source {source['name']}: {repr(e)}", log_type='warning')
continue
datasets[fhir_query] = response.json()
else:
response_names = asyncio.run(self._get_s3_dataset_names(source['name']))
for res_name in response_names: # premise: only retrieves data corresponding to s3_keys from each data source
if (s3_keys is None) or (res_name in s3_keys):
response = asyncio.run(self.client.get(f"{source['name']}/s3/{res_name}",
headers=[('Connection', 'close')]))
response.raise_for_status()
try:
response.raise_for_status()
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Failed to retrieve s3 data for key {res_name} "
f"from source {source['name']}: {repr(e)}")
datasets[res_name] = response.content
dataset_sources.append(datasets)
return dataset_sources

async def _get_s3_dataset_names(self, source_name: str) -> list[str]:
response = await self.client.get(f"{source_name}/s3", headers=[('Connection', 'close')])
response.raise_for_status()

try:
response.raise_for_status()
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Failed to retrieve S3 dataset names from source {source_name}: {repr(e)}")
responses = re.findall(r'<Key>(.*?)</Key>', str(response.text))
return responses

Expand All @@ -72,8 +92,8 @@ def get_data_source_client(self, data_id: str) -> AsyncClient:
if sources["id"] == data_id:
path = sources["paths"][0]
if path is None:
raise ValueError(f"Data source with id {data_id} not found")
client = AsyncClient(base_url=f"{path}",)
self.flame_logger.raise_error(f"Data source with id {data_id} not found")
client = AsyncClient(base_url=f"{path}")
return client


83 changes: 47 additions & 36 deletions flamesdk/resources/client_apis/clients/message_broker_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import asyncio
import datetime
from typing import Optional, Literal
from httpx import AsyncClient, HTTPError
from httpx import AsyncClient, HTTPStatusError

from flamesdk.resources.node_config import NodeConfig
from flamesdk.resources.utils.logging import flame_log
from flamesdk.resources.utils.logging import FlameLogger


class Message:
def __init__(self,
message: dict,
config: NodeConfig,
outgoing: bool,
flame_logger: FlameLogger,
message_number: Optional[int] = None,
category: Optional[str] = None,
recipients: Optional[list[str]] = None) -> None:
Expand All @@ -26,25 +27,25 @@ def __init__(self,
:param category: the message category
:param recipients: the list of recipients
"""
self.flame_logger = flame_logger
if outgoing:
if "meta" in message.keys():
raise ValueError("Cannot use field 'meta' in message body. "
"This field is reserved for meta data used by the message broker.")
self.flame_logger.raise_error("Cannot use field 'meta' in message body. "
"This field is reserved for meta data used by the message broker.")
elif type(message_number) != int:
raise ValueError(f"Specified outgoing message, but did not specify integer value for message_number "
f"(received: {type(message_number)}).")
self.flame_logger.raise_error(f"Specified outgoing message, but did not specify integer value for "
f"message_number (received: {type(message_number)}).")
elif type(category) != str:
raise ValueError("Specified outgoing message, but did not specify string value for category "
f"(received: {type(category)}).")

self.flame_logger.raise_error(f"Specified outgoing message, but did not specify string value for "
f"category (received: {type(category)}).")
elif (type(recipients) != list) or (any([type(recipient) != str for recipient in recipients])):
if hasattr(recipients, '__iter__'):
raise ValueError(f"Specified outgoing message, but did not specify list of strings value for "
f"recipients (received: {type(recipients)} containing "
f"{set([type(recipient) for recipient in recipients])}).")
self.flame_logger.raise_error(f"Specified outgoing message, but did not specify list of strings "
f"value for recipients (received: {type(recipients)} containing "
f"{set([type(recipient) for recipient in recipients])}).")
else:
raise ValueError(f"Specified outgoing message, but did not specify list of strings value for "
f"recipients (received: {type(recipients)}).")
self.flame_logger.raise_error(f"Specified outgoing message, but did not specify list of strings "
f"value for recipients (received: {type(recipients)}).")
self.recipients = recipients

self.body = message
Expand Down Expand Up @@ -92,14 +93,15 @@ def _update_meta_data(self,


class MessageBrokerClient:
def __init__(self, config: NodeConfig, silent: bool = False) -> None:
def __init__(self, config: NodeConfig, flame_logger: FlameLogger) -> None:
self.nodeConfig = config
self.flame_logger = flame_logger
self._message_broker = AsyncClient(
base_url=f"http://{self.nodeConfig.nginx_name}/message-broker",
headers={"Authorization": f"Bearer {config.keycloak_token}", "Accept": "application/json"},
follow_redirects=True
)
asyncio.run(self._connect(silent=silent))
asyncio.run(self._connect())
self.list_of_incoming_messages: list[Message] = []
self.list_of_outgoing_messages: list[Message] = []
self.message_number = 0
Expand All @@ -117,15 +119,20 @@ def refresh_token(self, keycloak_token: str):
async def get_self_config(self, analysis_id: str) -> dict[str, str]:
response = await self._message_broker.get(f'/analyses/{analysis_id}/participants/self',
headers=[('Connection', 'close')])
response.raise_for_status()
try:
response.raise_for_status()
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Failed to retrieve self configuration for analysis {analysis_id}: "
f"{repr(e)}")
return response.json()

async def get_partner_nodes(self, self_node_id: str, analysis_id: str) -> list[dict[str, str]]:
response = await self._message_broker.get(f'/analyses/{analysis_id}/participants',
headers=[('Connection', 'close')])

response.raise_for_status()

try:
response.raise_for_status()
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Failed to retrieve partner nodes for analysis {analysis_id} : {repr(e)}")
response = [node_conf for node_conf in response.json() if node_conf['nodeId'] != self_node_id]
return response

Expand All @@ -135,28 +142,28 @@ async def test_connection(self) -> bool:
try:
response.raise_for_status()
return True
except HTTPError:
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Failed to connect to message broker: {repr(e)}")
return False

async def _connect(self, silent: bool = False) -> None:
async def _connect(self) -> None:
response = await self._message_broker.post(
f'/analyses/{os.getenv("ANALYSIS_ID")}/messages/subscriptions',
json={'webhookUrl': f'http://{self.nodeConfig.nginx_name}/analysis/webhook'}
)
try:
response.raise_for_status()
except HTTPError as e:
flame_log("Failed to subscribe to message broker", silent)
flame_log(repr(e), silent)
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Failed to subscribe to message broker: {repr(e)}")
response = await self._message_broker.get(f'/analyses/{os.getenv("ANALYSIS_ID")}/participants/self',
headers=[('Connection', 'close')])
try:
response.raise_for_status()
except HTTPError as e:
flame_log("Successfully subscribed to message broker, but failed to retrieve participants", silent)
flame_log(repr(e), silent)
except HTTPStatusError as e:
self.flame_logger.raise_error(f"Successfully subscribed to message broker, "
f"but failed to retrieve participants: {repr(e)}")

async def send_message(self, message: Message, silent: bool = False) -> None:
async def send_message(self, message: Message) -> None:
self.message_number += 1
body = {
"recipients": message.recipients,
Expand All @@ -168,18 +175,18 @@ async def send_message(self, message: Message, silent: bool = False) -> None:
headers=[('Connection', 'close'),
("Content-Type", "application/json")])
if message.body["meta"]["sender"] == self.nodeConfig.node_id:
flame_log(f"send message: {body}", silent)
self.flame_logger.new_log(f"send message: {body}", log_type='info')

self.list_of_outgoing_messages.append(message)

def receive_message(self, body: dict, silent: bool = False) -> None:
def receive_message(self, body: dict) -> None:
needs_acknowledgment = body["meta"]["akn_id"] is None
message = Message(message=body, config=self.nodeConfig, outgoing=False)
message = Message(message=body, config=self.nodeConfig, outgoing=False, flame_logger=self.flame_logger )
self.list_of_incoming_messages.append(message)

if needs_acknowledgment:
flame_log("acknowledging ready check" if body["meta"]["category"] == "ready_check" else "incoming message",
silent)
self.flame_logger.new_log("acknowledging ready check" if body["meta"]["category"] == "ready_check" else "incoming message",
log_type='info')
asyncio.run(self.acknowledge_message(message))

def delete_message_by_id(self, message_id: str, type: Literal["outgoing", "incoming"]) -> int:
Expand All @@ -196,14 +203,18 @@ def delete_message_by_id(self, message_id: str, type: Literal["outgoing", "incom
self.list_of_outgoing_messages.remove(message)
number_of_deleted_messages += 1
if number_of_deleted_messages == 0:
raise ValueError(f"Could not find message with id={message_id} in outgoing messages.")
self.flame_logger.new_log(f"Could not find message with id={message_id} in outgoing messages.",
log_type='warning')
return 0
if type == "incoming":
for message in self.list_of_outgoing_messages:
if message.body["meta"]["id"] == message_id:
self.list_of_outgoing_messages.remove(message)
number_of_deleted_messages += 1
if number_of_deleted_messages == 0:
raise ValueError(f"Could not find message with id={message_id} in outgoing messages.")
self.flame_logger.new_log(f"Could not find message with id={message_id} in incoming messages.",
log_type='warning')
return 0
return number_of_deleted_messages

async def await_message(self,
Expand Down
40 changes: 40 additions & 0 deletions flamesdk/resources/client_apis/clients/po_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import Optional, Union
import asyncio
from httpx import Client, HTTPError

from flamesdk.resources.utils.logging import FlameLogger

class POClient:
def __init__(self, nginx_name: str, keycloak_token: str, flame_logger: FlameLogger) -> None:
self.nginx_name = nginx_name
self.client = Client(base_url=f"http://{nginx_name}/po",
headers={"Authorization": f"Bearer {keycloak_token}",
"accept": "application/json"},
follow_redirects=True)
self.flame_logger = flame_logger

def refresh_token(self, keycloak_token: str):
self.client = Client(base_url=f"http://{self.nginx_name}/po",
headers={"Authorization": f"Bearer {keycloak_token}",
"accept": "application/json"},
follow_redirects=True)

def stream_logs(self, log: str, log_type: str, analysis_id: str, status: str) -> None:
log_dict = {
"log": log,
"log_type": log_type,
"analysis_id": analysis_id,
"status": status
}
print("Sending logs to PO:", log_dict)
response = self.client.post("/stream_logs",
json=log_dict,
headers={"Content-Type": "application/json"})
try:
response.raise_for_status()
print("Successfully streamed logs to PO")
except HTTPError as e:
#self.flame_logger.new_log(f"Failed to stream logs to PO: {repr(e)}", log_type='error')
print("HTTP Error in po api:", repr(e))
except Exception as e:
print("Unforeseen Error:", repr(e))
Loading