Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,27 @@ global___TaskResult = TaskResult
class TaskProgressUpdate(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

class _ProgressStatus:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _ProgressStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TaskProgressUpdate._ProgressStatus.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
START: TaskProgressUpdate._ProgressStatus.ValueType # 0

class ProgressStatus(_ProgressStatus, metaclass=_ProgressStatusEnumTypeWrapper): ...
START: TaskProgressUpdate.ProgressStatus.ValueType # 0

JOB_ID_FIELD_NUMBER: builtins.int
CELERY_TASK_ID_FIELD_NUMBER: builtins.int
CELERY_TASK_TYPE_FIELD_NUMBER: builtins.int
STATUS_FIELD_NUMBER: builtins.int
PROGRESS_FIELD_NUMBER: builtins.int
MESSAGE_FIELD_NUMBER: builtins.int
job_id: builtins.str
celery_task_id: builtins.str
celery_task_type: builtins.str
status: global___TaskProgressUpdate.ProgressStatus.ValueType
progress: builtins.float
message: builtins.str
def __init__(
Expand All @@ -81,9 +94,12 @@ class TaskProgressUpdate(google.protobuf.message.Message):
job_id: builtins.str = ...,
celery_task_id: builtins.str = ...,
celery_task_type: builtins.str = ...,
status: global___TaskProgressUpdate.ProgressStatus.ValueType = ...,
progress: builtins.float = ...,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["progress_type", b"progress_type"]) -> typing_extensions.Literal["status", "progress"] | None: ...

global___TaskProgressUpdate = TaskProgressUpdate
21 changes: 20 additions & 1 deletion src/omotes_sdk/internal/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ def __init__(self, job_id: UUID, task: CeleryTask, broker_if: BrokerInterface):
self.task = task
self.broker_if = broker_if

def send_start(self) -> None:
"""Send a START progress update to the orchestrator."""
logger.debug(
"Sending START progress update for job %s (celery id %s)",
self.job_id,
self.task.request.id,
)
self.broker_if.send_message_to(
None,
WORKER.config.task_progress_queue_name,
TaskProgressUpdate(
job_id=str(self.job_id),
celery_task_id=self.task.request.id,
celery_task_type=WORKER_TASK_TYPE,
status=TaskProgressUpdate.START,
message="Started job at worker.",
).SerializeToString(),
)

def update_progress(self, fraction: float, message: str) -> None:
"""Send a progress update to the orchestrator.

Expand Down Expand Up @@ -243,7 +262,7 @@ def wrapped_worker_task(
"""
logger.info("Worker started new task %s with reference %s", job_id, job_reference)
task_util = TaskUtil(job_id, task, task.broker_if)
task_util.update_progress(0, "Job calculation started")
task_util.send_start()
output_esdl = WORKER_TASK_FUNCTION(input_esdl, params_dict, task_util.update_progress)

input_esh = pyesdl_from_string(input_esdl)
Expand Down
9 changes: 8 additions & 1 deletion task_messages_protocol/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ message TaskProgressUpdate {
string job_id = 1;
string celery_task_id = 2;
string celery_task_type = 5;
double progress = 3;
oneof progress_type {
ProgressStatus status = 6;
double progress = 7;
}
string message = 4;

enum ProgressStatus {
START = 0;
}
}