Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions ci/linux/gen_protocol.sh

This file was deleted.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [

dependencies = [
"aio-pika ~= 9.4.2",
"omotes-sdk-protocol ~= 0.1.6",
"omotes-sdk-protocol ~= 0.1.13",
"pyesdl ~= 24.2",
"pamqp ~= 3.3.0",
"celery ~= 5.3.6",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional

from omotes_sdk_protocol.job_pb2 import EsdlMessage as EsdlMessagePb


class MessageSeverity(Enum):
"""Message severity options."""

DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"


@dataclass
class EsdlMessage:
"""Esdl feedback message, optionally related to a specific object (asset)."""

technical_message: str
"""Technical message."""
severity: MessageSeverity
"""Message severity."""
esdl_object_id: Optional[str] = None
"""Optional esdl object id, None implies a general energy system message."""

def to_protobuf_message(self) -> EsdlMessagePb:
"""Generate a protobuf message from this class.

:return: Protobuf message representation.
"""
return EsdlMessagePb(
technical_message=self.technical_message,
severity=EsdlMessagePb.Severity.Value(self.severity.value),
esdl_object_id=self.esdl_object_id,
)

This file was deleted.

This file was deleted.

74 changes: 48 additions & 26 deletions src/omotes_sdk/internal/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import socket
import sys
from typing import Callable, Dict, List, Any, Optional
from typing import Callable, Dict, List, Any, Optional, Tuple
from uuid import UUID

import streamcapture
Expand All @@ -13,9 +13,10 @@
from esdl import EnergySystem
from esdl.esdl_handler import EnergySystemHandler

from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import EsdlMessage
from omotes_sdk.internal.worker.configs import WorkerConfig
from omotes_sdk.internal.common.broker_interface import BrokerInterface
from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import (
from omotes_sdk_protocol.internal.task_pb2 import (
TaskResult,
TaskProgressUpdate,
)
Expand Down Expand Up @@ -99,11 +100,13 @@ class WorkerTask(CeleryTask):
broker_if: BrokerInterface

output_esdl: Optional[str]
esdl_messages: List[EsdlMessage]

def __init__(self) -> None:
"""Create the worker task."""
super().__init__()
self.output_esdl = None
self.esdl_messages = []

def before_start(self, task_id: str, args: List[Any], kwargs: Dict[str, Any]) -> None:
"""Runs before task start.
Expand All @@ -128,7 +131,7 @@ def after_return(
kwargs: Dict[str, Any],
einfo: str,
) -> None:
"""Runs after task start.
"""Runs after task finished.

:param status: Task status.
:param retval: Task return value.
Expand All @@ -143,13 +146,18 @@ def after_return(
logs = self.logs.getvalue().decode()
self.logs.close()

# to protobuf esdl messages:
esdl_messages_pb = [
esdl_message.to_protobuf_message() for esdl_message in self.esdl_messages
]

job_id: UUID = args[0]
job_reference: str = args[1]

result_message = None
if status == "SUCCESS":
if status == "FAILURE" or not self.output_esdl:
logger.info(
"Job %s (celery task id %s) with reference %s was successful.",
"Job %s (celery task id %s) with reference %s failed.",
job_id,
self.request.id,
job_reference,
Expand All @@ -158,14 +166,14 @@ def after_return(
job_id=str(job_id),
celery_task_id=self.request.id,
celery_task_type=WORKER_TASK_TYPE,
result_type=TaskResult.ResultType.SUCCEEDED,
output_esdl=self.output_esdl,
result_type=TaskResult.ResultType.ERROR,
output_esdl="",
logs=logs,
esdl_messages=esdl_messages_pb,
)

elif status == "FAILURE":
elif status == "SUCCESS":
logger.info(
"Job %s (celery task id %s) with reference %s failed.",
"Job %s (celery task id %s) with reference %s was successful.",
job_id,
self.request.id,
job_reference,
Expand All @@ -174,9 +182,10 @@ def after_return(
job_id=str(job_id),
celery_task_id=self.request.id,
celery_task_type=WORKER_TASK_TYPE,
result_type=TaskResult.ResultType.ERROR,
output_esdl="",
result_type=TaskResult.ResultType.SUCCEEDED,
output_esdl=self.output_esdl,
logs=logs,
esdl_messages=esdl_messages_pb,
)
else:
logger.error(
Expand Down Expand Up @@ -263,21 +272,28 @@ def wrapped_worker_task(
logger.info("Worker started new task %s with reference %s", job_id, job_reference)
task_util = TaskUtil(job_id, task, task.broker_if)
task_util.send_start()
output_esdl = WORKER_TASK_FUNCTION(input_esdl, params_dict, task_util.update_progress)

input_esh = pyesdl_from_string(input_esdl)
input_energy_system: EnergySystem = input_esh.energy_system
if job_reference is None:
new_name = f"{input_energy_system.name}_{WORKER_TASK_TYPE}"
elif job_reference == "":
new_name = f"{input_energy_system.name}"
output_esdl, esdl_messages = WORKER_TASK_FUNCTION(
input_esdl, params_dict, task_util.update_progress
)

if output_esdl:
input_esh = pyesdl_from_string(input_esdl)
input_energy_system: EnergySystem = input_esh.energy_system
if job_reference is None:
new_name = f"{input_energy_system.name}_{WORKER_TASK_TYPE}"
elif job_reference == "":
new_name = f"{input_energy_system.name}"
else:
new_name = f"{input_energy_system.name}_{job_reference}"

output_esh = pyesdl_from_string(output_esdl)
output_energy_system: EnergySystem = output_esh.energy_system
output_energy_system.name = new_name
task.output_esdl = output_esh.to_string()
else:
new_name = f"{input_energy_system.name}_{job_reference}"
task.output_esdl = None

output_esh = pyesdl_from_string(output_esdl)
output_energy_system: EnergySystem = output_esh.energy_system
output_energy_system.name = new_name
task.output_esdl = output_esh.to_string()
task.esdl_messages = esdl_messages

task_util.update_progress(1.0, "Calculation finished.")

Expand Down Expand Up @@ -345,7 +361,13 @@ def start(self) -> None:


UpdateProgressHandler = Callable[[float, str], None]
WorkerTaskF = Callable[[str, ProtobufDict, UpdateProgressHandler], str]
WorkerTaskF = Callable[
[str, ProtobufDict, UpdateProgressHandler],
Tuple[
Optional[str],
List[EsdlMessage],
],
]

WORKER: Worker = None # type: ignore [assignment] # noqa
WORKER_TASK_FUNCTION: WorkerTaskF = None # type: ignore [assignment] # noqa
Expand Down
Loading