diff --git a/ci/linux/gen_protocol.sh b/ci/linux/gen_protocol.sh deleted file mode 100755 index ca8b797..0000000 --- a/ci/linux/gen_protocol.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - -if [[ "$OSTYPE" != "win32" && "$OSTYPE" != "msys" ]]; then - echo "Activating .venv first." - . .venv/bin/activate -fi - -rm -Rf src/omotes_sdk/internal/orchestrator_worker_events/messages/ -mkdir -p src/omotes_sdk/internal/orchestrator_worker_events/messages/ -protoc -I task_messages_protocol/ --python_out src/omotes_sdk/internal/orchestrator_worker_events/messages/ ./task_messages_protocol/task.proto -protoc -I task_messages_protocol/ --mypy_out src/omotes_sdk/internal/orchestrator_worker_events/messages/ ./task_messages_protocol/task.proto -touch src/omotes_sdk/internal/orchestrator_worker_events/messages/__init__.py diff --git a/pyproject.toml b/pyproject.toml index b501dde..bf92994 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ dependencies = [ "aio-pika ~= 9.4.2", - "omotes-sdk-protocol ~= 0.1.6", + "omotes-sdk-protocol ~= 0.1.13", "pyesdl ~= 24.2", "pamqp ~= 3.3.0", "celery ~= 5.3.6", diff --git a/src/omotes_sdk/internal/orchestrator_worker_events/esdl_messages.py b/src/omotes_sdk/internal/orchestrator_worker_events/esdl_messages.py new file mode 100644 index 0000000..f9507cf --- /dev/null +++ b/src/omotes_sdk/internal/orchestrator_worker_events/esdl_messages.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +from omotes_sdk_protocol.job_pb2 import EsdlMessage as EsdlMessagePb + + +class MessageSeverity(Enum): + """Message severity options.""" + + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + + +@dataclass +class EsdlMessage: + """Esdl feedback message, optionally related to a specific object (asset).""" + + technical_message: str + """Technical message.""" + severity: MessageSeverity + """Message severity.""" + esdl_object_id: Optional[str] = None + """Optional esdl object id, None implies a general energy system message.""" + + def to_protobuf_message(self) -> EsdlMessagePb: + """Generate a protobuf message from this class. + + :return: Protobuf message representation. + """ + return EsdlMessagePb( + technical_message=self.technical_message, + severity=EsdlMessagePb.Severity.Value(self.severity.value), + esdl_object_id=self.esdl_object_id, + ) diff --git a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py b/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py deleted file mode 100644 index 4c44bff..0000000 --- a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: task.proto -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ntask.proto\"\xdb\x01\n\nTaskResult\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x65lery_task_id\x18\x02 \x01(\t\x12\x18\n\x10\x63\x65lery_task_type\x18\x06 \x01(\t\x12+\n\x0bresult_type\x18\x03 \x01(\x0e\x32\x16.TaskResult.ResultType\x12\x18\n\x0boutput_esdl\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x0c\n\x04logs\x18\x05 \x01(\t\"&\n\nResultType\x12\r\n\tSUCCEEDED\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x42\x0e\n\x0c_output_esdl\"\xdf\x01\n\x12TaskProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x65lery_task_id\x18\x02 \x01(\t\x12\x18\n\x10\x63\x65lery_task_type\x18\x05 \x01(\t\x12\x34\n\x06status\x18\x06 \x01(\x0e\x32\".TaskProgressUpdate.ProgressStatusH\x00\x12\x12\n\x08progress\x18\x07 \x01(\x01H\x00\x12\x0f\n\x07message\x18\x04 \x01(\t\"\x1b\n\x0eProgressStatus\x12\t\n\x05START\x10\x00\x42\x0f\n\rprogress_typeb\x06proto3') - - - -_TASKRESULT = DESCRIPTOR.message_types_by_name['TaskResult'] -_TASKPROGRESSUPDATE = DESCRIPTOR.message_types_by_name['TaskProgressUpdate'] -_TASKRESULT_RESULTTYPE = _TASKRESULT.enum_types_by_name['ResultType'] -_TASKPROGRESSUPDATE_PROGRESSSTATUS = _TASKPROGRESSUPDATE.enum_types_by_name['ProgressStatus'] -TaskResult = _reflection.GeneratedProtocolMessageType('TaskResult', (_message.Message,), { - 'DESCRIPTOR' : _TASKRESULT, - '__module__' : 'task_pb2' - # @@protoc_insertion_point(class_scope:TaskResult) - }) -_sym_db.RegisterMessage(TaskResult) - -TaskProgressUpdate = _reflection.GeneratedProtocolMessageType('TaskProgressUpdate', (_message.Message,), { - 'DESCRIPTOR' : _TASKPROGRESSUPDATE, - '__module__' : 'task_pb2' - # @@protoc_insertion_point(class_scope:TaskProgressUpdate) - }) -_sym_db.RegisterMessage(TaskProgressUpdate) - -if _descriptor._USE_C_DESCRIPTORS == False: - - DESCRIPTOR._options = None - _TASKRESULT._serialized_start=15 - _TASKRESULT._serialized_end=234 - _TASKRESULT_RESULTTYPE._serialized_start=180 - _TASKRESULT_RESULTTYPE._serialized_end=218 - _TASKPROGRESSUPDATE._serialized_start=237 - _TASKPROGRESSUPDATE._serialized_end=460 - _TASKPROGRESSUPDATE_PROGRESSSTATUS._serialized_start=416 - _TASKPROGRESSUPDATE_PROGRESSSTATUS._serialized_end=443 -# @@protoc_insertion_point(module_scope) diff --git a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi b/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi deleted file mode 100644 index ed63150..0000000 --- a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi +++ /dev/null @@ -1,105 +0,0 @@ -""" -@generated by mypy-protobuf. Do not edit manually! -isort:skip_file -""" -import builtins -import google.protobuf.descriptor -import google.protobuf.internal.enum_type_wrapper -import google.protobuf.message -import sys -import typing - -if sys.version_info >= (3, 10): - import typing as typing_extensions -else: - import typing_extensions - -DESCRIPTOR: google.protobuf.descriptor.FileDescriptor - -@typing_extensions.final -class TaskResult(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - class _ResultType: - ValueType = typing.NewType("ValueType", builtins.int) - V: typing_extensions.TypeAlias = ValueType - - class _ResultTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TaskResult._ResultType.ValueType], builtins.type): - DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - SUCCEEDED: TaskResult._ResultType.ValueType # 0 - ERROR: TaskResult._ResultType.ValueType # 1 - - class ResultType(_ResultType, metaclass=_ResultTypeEnumTypeWrapper): ... - SUCCEEDED: TaskResult.ResultType.ValueType # 0 - ERROR: TaskResult.ResultType.ValueType # 1 - - JOB_ID_FIELD_NUMBER: builtins.int - CELERY_TASK_ID_FIELD_NUMBER: builtins.int - CELERY_TASK_TYPE_FIELD_NUMBER: builtins.int - RESULT_TYPE_FIELD_NUMBER: builtins.int - OUTPUT_ESDL_FIELD_NUMBER: builtins.int - LOGS_FIELD_NUMBER: builtins.int - job_id: builtins.str - celery_task_id: builtins.str - celery_task_type: builtins.str - result_type: global___TaskResult.ResultType.ValueType - output_esdl: builtins.str - logs: builtins.str - def __init__( - self, - *, - job_id: builtins.str = ..., - celery_task_id: builtins.str = ..., - celery_task_type: builtins.str = ..., - result_type: global___TaskResult.ResultType.ValueType = ..., - output_esdl: builtins.str | None = ..., - logs: builtins.str = ..., - ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["_output_esdl", b"_output_esdl", "output_esdl", b"output_esdl"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["_output_esdl", b"_output_esdl", "celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "logs", b"logs", "output_esdl", b"output_esdl", "result_type", b"result_type"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["_output_esdl", b"_output_esdl"]) -> typing_extensions.Literal["output_esdl"] | None: ... - -global___TaskResult = TaskResult - -@typing_extensions.final -class TaskProgressUpdate(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - - class _ProgressStatus: - ValueType = typing.NewType("ValueType", builtins.int) - V: typing_extensions.TypeAlias = ValueType - - class _ProgressStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TaskProgressUpdate._ProgressStatus.ValueType], builtins.type): - DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - START: TaskProgressUpdate._ProgressStatus.ValueType # 0 - - class ProgressStatus(_ProgressStatus, metaclass=_ProgressStatusEnumTypeWrapper): ... - START: TaskProgressUpdate.ProgressStatus.ValueType # 0 - - JOB_ID_FIELD_NUMBER: builtins.int - CELERY_TASK_ID_FIELD_NUMBER: builtins.int - CELERY_TASK_TYPE_FIELD_NUMBER: builtins.int - STATUS_FIELD_NUMBER: builtins.int - PROGRESS_FIELD_NUMBER: builtins.int - MESSAGE_FIELD_NUMBER: builtins.int - job_id: builtins.str - celery_task_id: builtins.str - celery_task_type: builtins.str - status: global___TaskProgressUpdate.ProgressStatus.ValueType - progress: builtins.float - message: builtins.str - def __init__( - self, - *, - job_id: builtins.str = ..., - celery_task_id: builtins.str = ..., - celery_task_type: builtins.str = ..., - status: global___TaskProgressUpdate.ProgressStatus.ValueType = ..., - progress: builtins.float = ..., - message: builtins.str = ..., - ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["progress_type", b"progress_type"]) -> typing_extensions.Literal["status", "progress"] | None: ... - -global___TaskProgressUpdate = TaskProgressUpdate diff --git a/src/omotes_sdk/internal/worker/worker.py b/src/omotes_sdk/internal/worker/worker.py index 3bc6fca..322b5ea 100644 --- a/src/omotes_sdk/internal/worker/worker.py +++ b/src/omotes_sdk/internal/worker/worker.py @@ -2,7 +2,7 @@ import logging import socket import sys -from typing import Callable, Dict, List, Any, Optional +from typing import Callable, Dict, List, Any, Optional, Tuple from uuid import UUID import streamcapture @@ -13,9 +13,10 @@ from esdl import EnergySystem from esdl.esdl_handler import EnergySystemHandler +from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import EsdlMessage from omotes_sdk.internal.worker.configs import WorkerConfig from omotes_sdk.internal.common.broker_interface import BrokerInterface -from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import ( +from omotes_sdk_protocol.internal.task_pb2 import ( TaskResult, TaskProgressUpdate, ) @@ -99,11 +100,13 @@ class WorkerTask(CeleryTask): broker_if: BrokerInterface output_esdl: Optional[str] + esdl_messages: List[EsdlMessage] def __init__(self) -> None: """Create the worker task.""" super().__init__() self.output_esdl = None + self.esdl_messages = [] def before_start(self, task_id: str, args: List[Any], kwargs: Dict[str, Any]) -> None: """Runs before task start. @@ -128,7 +131,7 @@ def after_return( kwargs: Dict[str, Any], einfo: str, ) -> None: - """Runs after task start. + """Runs after task finished. :param status: Task status. :param retval: Task return value. @@ -143,13 +146,18 @@ def after_return( logs = self.logs.getvalue().decode() self.logs.close() + # to protobuf esdl messages: + esdl_messages_pb = [ + esdl_message.to_protobuf_message() for esdl_message in self.esdl_messages + ] + job_id: UUID = args[0] job_reference: str = args[1] result_message = None - if status == "SUCCESS": + if status == "FAILURE" or not self.output_esdl: logger.info( - "Job %s (celery task id %s) with reference %s was successful.", + "Job %s (celery task id %s) with reference %s failed.", job_id, self.request.id, job_reference, @@ -158,14 +166,14 @@ def after_return( job_id=str(job_id), celery_task_id=self.request.id, celery_task_type=WORKER_TASK_TYPE, - result_type=TaskResult.ResultType.SUCCEEDED, - output_esdl=self.output_esdl, + result_type=TaskResult.ResultType.ERROR, + output_esdl="", logs=logs, + esdl_messages=esdl_messages_pb, ) - - elif status == "FAILURE": + elif status == "SUCCESS": logger.info( - "Job %s (celery task id %s) with reference %s failed.", + "Job %s (celery task id %s) with reference %s was successful.", job_id, self.request.id, job_reference, @@ -174,9 +182,10 @@ def after_return( job_id=str(job_id), celery_task_id=self.request.id, celery_task_type=WORKER_TASK_TYPE, - result_type=TaskResult.ResultType.ERROR, - output_esdl="", + result_type=TaskResult.ResultType.SUCCEEDED, + output_esdl=self.output_esdl, logs=logs, + esdl_messages=esdl_messages_pb, ) else: logger.error( @@ -263,21 +272,28 @@ def wrapped_worker_task( logger.info("Worker started new task %s with reference %s", job_id, job_reference) task_util = TaskUtil(job_id, task, task.broker_if) task_util.send_start() - output_esdl = WORKER_TASK_FUNCTION(input_esdl, params_dict, task_util.update_progress) - - input_esh = pyesdl_from_string(input_esdl) - input_energy_system: EnergySystem = input_esh.energy_system - if job_reference is None: - new_name = f"{input_energy_system.name}_{WORKER_TASK_TYPE}" - elif job_reference == "": - new_name = f"{input_energy_system.name}" + output_esdl, esdl_messages = WORKER_TASK_FUNCTION( + input_esdl, params_dict, task_util.update_progress + ) + + if output_esdl: + input_esh = pyesdl_from_string(input_esdl) + input_energy_system: EnergySystem = input_esh.energy_system + if job_reference is None: + new_name = f"{input_energy_system.name}_{WORKER_TASK_TYPE}" + elif job_reference == "": + new_name = f"{input_energy_system.name}" + else: + new_name = f"{input_energy_system.name}_{job_reference}" + + output_esh = pyesdl_from_string(output_esdl) + output_energy_system: EnergySystem = output_esh.energy_system + output_energy_system.name = new_name + task.output_esdl = output_esh.to_string() else: - new_name = f"{input_energy_system.name}_{job_reference}" + task.output_esdl = None - output_esh = pyesdl_from_string(output_esdl) - output_energy_system: EnergySystem = output_esh.energy_system - output_energy_system.name = new_name - task.output_esdl = output_esh.to_string() + task.esdl_messages = esdl_messages task_util.update_progress(1.0, "Calculation finished.") @@ -345,7 +361,13 @@ def start(self) -> None: UpdateProgressHandler = Callable[[float, str], None] -WorkerTaskF = Callable[[str, ProtobufDict, UpdateProgressHandler], str] +WorkerTaskF = Callable[ + [str, ProtobufDict, UpdateProgressHandler], + Tuple[ + Optional[str], + List[EsdlMessage], + ], +] WORKER: Worker = None # type: ignore [assignment] # noqa WORKER_TASK_FUNCTION: WorkerTaskF = None # type: ignore [assignment] # noqa diff --git a/task_messages_protocol/task.proto b/task_messages_protocol/task.proto deleted file mode 100644 index fcb2ede..0000000 --- a/task_messages_protocol/task.proto +++ /dev/null @@ -1,30 +0,0 @@ -syntax = "proto3"; - -message TaskResult { - string job_id = 1; - string celery_task_id = 2; - string celery_task_type = 6; - ResultType result_type = 3; - optional string output_esdl = 4; - string logs = 5; - - enum ResultType { - SUCCEEDED = 0; - ERROR = 1; - } -} - -message TaskProgressUpdate { - string job_id = 1; - string celery_task_id = 2; - string celery_task_type = 5; - oneof progress_type { - ProgressStatus status = 6; - double progress = 7; - } - string message = 4; - - enum ProgressStatus { - START = 0; - } -} diff --git a/src/omotes_sdk/internal/orchestrator_worker_events/messages/__init__.py b/unit_test/internal/orchestrator_worker_events/__init__.py similarity index 100% rename from src/omotes_sdk/internal/orchestrator_worker_events/messages/__init__.py rename to unit_test/internal/orchestrator_worker_events/__init__.py diff --git a/unit_test/internal/orchestrator_worker_events/test_esdl_messages.py b/unit_test/internal/orchestrator_worker_events/test_esdl_messages.py new file mode 100644 index 0000000..6bf9987 --- /dev/null +++ b/unit_test/internal/orchestrator_worker_events/test_esdl_messages.py @@ -0,0 +1,51 @@ +import unittest + +from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import ( + EsdlMessage, + MessageSeverity, +) +from omotes_sdk_protocol.job_pb2 import EsdlMessage as EsdlMessagePb + + +class TestEsdlMessages(unittest.TestCase): + def test__esdl_message__to_protobuf_message(self) -> None: + # Arrange + esdl_message = EsdlMessage( + technical_message="message 1", + severity=MessageSeverity.ERROR, + esdl_object_id="uuid", + ) + + # Act + esdl_message_pb = esdl_message.to_protobuf_message() + + # Assert + assert esdl_message_pb.technical_message == "message 1" + assert esdl_message_pb.severity == EsdlMessagePb.Severity.ERROR + assert esdl_message_pb.esdl_object_id == "uuid" + + def test__esdl_message__to_protobuf_message_no_esdl_object_id(self) -> None: + # Arrange + esdl_message_esdl_object_id_none = EsdlMessage( + technical_message="message 2", + severity=MessageSeverity.WARNING, + esdl_object_id=None, + ) + esdl_message_esdl_object_id_omitted = EsdlMessage( + technical_message="message 3", + severity=MessageSeverity.INFO, + ) + + # Act + esdl_message_esdl_object_id_none_pb = esdl_message_esdl_object_id_none.to_protobuf_message() + esdl_message_esdl_object_id_omitted_pb = ( + esdl_message_esdl_object_id_omitted.to_protobuf_message() + ) + + # Assert + assert esdl_message_esdl_object_id_none_pb.technical_message == "message 2" + assert esdl_message_esdl_object_id_none_pb.severity == EsdlMessagePb.Severity.WARNING + assert esdl_message_esdl_object_id_none_pb.esdl_object_id == "" + assert esdl_message_esdl_object_id_omitted_pb.technical_message == "message 3" + assert esdl_message_esdl_object_id_omitted_pb.severity == EsdlMessagePb.Severity.INFO + assert esdl_message_esdl_object_id_omitted_pb.esdl_object_id == ""