diff --git a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py b/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py index d800b29..4c44bff 100644 --- a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py +++ b/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: task.proto -# Protobuf Python Version: 4.25.2 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -14,17 +14,37 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ntask.proto\"\xdb\x01\n\nTaskResult\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x65lery_task_id\x18\x02 \x01(\t\x12\x18\n\x10\x63\x65lery_task_type\x18\x06 \x01(\t\x12+\n\x0bresult_type\x18\x03 \x01(\x0e\x32\x16.TaskResult.ResultType\x12\x18\n\x0boutput_esdl\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x0c\n\x04logs\x18\x05 \x01(\t\"&\n\nResultType\x12\r\n\tSUCCEEDED\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x42\x0e\n\x0c_output_esdl\"y\n\x12TaskProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x65lery_task_id\x18\x02 \x01(\t\x12\x18\n\x10\x63\x65lery_task_type\x18\x05 \x01(\t\x12\x10\n\x08progress\x18\x03 \x01(\x01\x12\x0f\n\x07message\x18\x04 \x01(\tb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ntask.proto\"\xdb\x01\n\nTaskResult\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x65lery_task_id\x18\x02 \x01(\t\x12\x18\n\x10\x63\x65lery_task_type\x18\x06 \x01(\t\x12+\n\x0bresult_type\x18\x03 \x01(\x0e\x32\x16.TaskResult.ResultType\x12\x18\n\x0boutput_esdl\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x0c\n\x04logs\x18\x05 \x01(\t\"&\n\nResultType\x12\r\n\tSUCCEEDED\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x42\x0e\n\x0c_output_esdl\"\xdf\x01\n\x12TaskProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x16\n\x0e\x63\x65lery_task_id\x18\x02 \x01(\t\x12\x18\n\x10\x63\x65lery_task_type\x18\x05 \x01(\t\x12\x34\n\x06status\x18\x06 \x01(\x0e\x32\".TaskProgressUpdate.ProgressStatusH\x00\x12\x12\n\x08progress\x18\x07 \x01(\x01H\x00\x12\x0f\n\x07message\x18\x04 \x01(\t\"\x1b\n\x0eProgressStatus\x12\t\n\x05START\x10\x00\x42\x0f\n\rprogress_typeb\x06proto3') + + + +_TASKRESULT = DESCRIPTOR.message_types_by_name['TaskResult'] +_TASKPROGRESSUPDATE = DESCRIPTOR.message_types_by_name['TaskProgressUpdate'] +_TASKRESULT_RESULTTYPE = _TASKRESULT.enum_types_by_name['ResultType'] +_TASKPROGRESSUPDATE_PROGRESSSTATUS = _TASKPROGRESSUPDATE.enum_types_by_name['ProgressStatus'] +TaskResult = _reflection.GeneratedProtocolMessageType('TaskResult', (_message.Message,), { + 'DESCRIPTOR' : _TASKRESULT, + '__module__' : 'task_pb2' + # @@protoc_insertion_point(class_scope:TaskResult) + }) +_sym_db.RegisterMessage(TaskResult) + +TaskProgressUpdate = _reflection.GeneratedProtocolMessageType('TaskProgressUpdate', (_message.Message,), { + 'DESCRIPTOR' : _TASKPROGRESSUPDATE, + '__module__' : 'task_pb2' + # @@protoc_insertion_point(class_scope:TaskProgressUpdate) + }) +_sym_db.RegisterMessage(TaskProgressUpdate) -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'task_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None - _globals['_TASKRESULT']._serialized_start=15 - _globals['_TASKRESULT']._serialized_end=234 - _globals['_TASKRESULT_RESULTTYPE']._serialized_start=180 - _globals['_TASKRESULT_RESULTTYPE']._serialized_end=218 - _globals['_TASKPROGRESSUPDATE']._serialized_start=236 - _globals['_TASKPROGRESSUPDATE']._serialized_end=357 + _TASKRESULT._serialized_start=15 + _TASKRESULT._serialized_end=234 + _TASKRESULT_RESULTTYPE._serialized_start=180 + _TASKRESULT_RESULTTYPE._serialized_end=218 + _TASKPROGRESSUPDATE._serialized_start=237 + _TASKPROGRESSUPDATE._serialized_end=460 + _TASKPROGRESSUPDATE_PROGRESSSTATUS._serialized_start=416 + _TASKPROGRESSUPDATE_PROGRESSSTATUS._serialized_end=443 # @@protoc_insertion_point(module_scope) diff --git a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi b/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi index 5315199..ed63150 100644 --- a/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi +++ b/src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi @@ -65,14 +65,27 @@ global___TaskResult = TaskResult class TaskProgressUpdate(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor + class _ProgressStatus: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _ProgressStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TaskProgressUpdate._ProgressStatus.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + START: TaskProgressUpdate._ProgressStatus.ValueType # 0 + + class ProgressStatus(_ProgressStatus, metaclass=_ProgressStatusEnumTypeWrapper): ... + START: TaskProgressUpdate.ProgressStatus.ValueType # 0 + JOB_ID_FIELD_NUMBER: builtins.int CELERY_TASK_ID_FIELD_NUMBER: builtins.int CELERY_TASK_TYPE_FIELD_NUMBER: builtins.int + STATUS_FIELD_NUMBER: builtins.int PROGRESS_FIELD_NUMBER: builtins.int MESSAGE_FIELD_NUMBER: builtins.int job_id: builtins.str celery_task_id: builtins.str celery_task_type: builtins.str + status: global___TaskProgressUpdate.ProgressStatus.ValueType progress: builtins.float message: builtins.str def __init__( @@ -81,9 +94,12 @@ class TaskProgressUpdate(google.protobuf.message.Message): job_id: builtins.str = ..., celery_task_id: builtins.str = ..., celery_task_type: builtins.str = ..., + status: global___TaskProgressUpdate.ProgressStatus.ValueType = ..., progress: builtins.float = ..., message: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["progress_type", b"progress_type"]) -> typing_extensions.Literal["status", "progress"] | None: ... global___TaskProgressUpdate = TaskProgressUpdate diff --git a/src/omotes_sdk/internal/worker/worker.py b/src/omotes_sdk/internal/worker/worker.py index 81cc47b..3bc6fca 100644 --- a/src/omotes_sdk/internal/worker/worker.py +++ b/src/omotes_sdk/internal/worker/worker.py @@ -45,6 +45,25 @@ def __init__(self, job_id: UUID, task: CeleryTask, broker_if: BrokerInterface): self.task = task self.broker_if = broker_if + def send_start(self) -> None: + """Send a START progress update to the orchestrator.""" + logger.debug( + "Sending START progress update for job %s (celery id %s)", + self.job_id, + self.task.request.id, + ) + self.broker_if.send_message_to( + None, + WORKER.config.task_progress_queue_name, + TaskProgressUpdate( + job_id=str(self.job_id), + celery_task_id=self.task.request.id, + celery_task_type=WORKER_TASK_TYPE, + status=TaskProgressUpdate.START, + message="Started job at worker.", + ).SerializeToString(), + ) + def update_progress(self, fraction: float, message: str) -> None: """Send a progress update to the orchestrator. @@ -243,7 +262,7 @@ def wrapped_worker_task( """ logger.info("Worker started new task %s with reference %s", job_id, job_reference) task_util = TaskUtil(job_id, task, task.broker_if) - task_util.update_progress(0, "Job calculation started") + task_util.send_start() output_esdl = WORKER_TASK_FUNCTION(input_esdl, params_dict, task_util.update_progress) input_esh = pyesdl_from_string(input_esdl) diff --git a/task_messages_protocol/task.proto b/task_messages_protocol/task.proto index 082807f..fcb2ede 100644 --- a/task_messages_protocol/task.proto +++ b/task_messages_protocol/task.proto @@ -18,6 +18,13 @@ message TaskProgressUpdate { string job_id = 1; string celery_task_id = 2; string celery_task_type = 5; - double progress = 3; + oneof progress_type { + ProgressStatus status = 6; + double progress = 7; + } string message = 4; + + enum ProgressStatus { + START = 0; + } }