Skip to content

Commit 4e2d6c7

Browse files
committed
84: Change field name of numerical back to progress for numerical progress in TaskUpdateProgress messages and send START instead of 0.0 progress on worker start.
1 parent c3a5727 commit 4e2d6c7

File tree

4 files changed

+32
-13
lines changed

4 files changed

+32
-13
lines changed

src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ class TaskProgressUpdate(google.protobuf.message.Message):
8080
CELERY_TASK_ID_FIELD_NUMBER: builtins.int
8181
CELERY_TASK_TYPE_FIELD_NUMBER: builtins.int
8282
STATUS_FIELD_NUMBER: builtins.int
83-
NUMERICAL_FIELD_NUMBER: builtins.int
83+
PROGRESS_FIELD_NUMBER: builtins.int
8484
MESSAGE_FIELD_NUMBER: builtins.int
8585
job_id: builtins.str
8686
celery_task_id: builtins.str
8787
celery_task_type: builtins.str
8888
status: global___TaskProgressUpdate.ProgressStatus.ValueType
89-
numerical: builtins.float
89+
progress: builtins.float
9090
message: builtins.str
9191
def __init__(
9292
self,
@@ -95,11 +95,11 @@ class TaskProgressUpdate(google.protobuf.message.Message):
9595
celery_task_id: builtins.str = ...,
9696
celery_task_type: builtins.str = ...,
9797
status: global___TaskProgressUpdate.ProgressStatus.ValueType = ...,
98-
numerical: builtins.float = ...,
98+
progress: builtins.float = ...,
9999
message: builtins.str = ...,
100100
) -> None: ...
101-
def HasField(self, field_name: typing_extensions.Literal["numerical", b"numerical", "progress", b"progress", "status", b"status"]) -> builtins.bool: ...
102-
def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "numerical", b"numerical", "progress", b"progress", "status", b"status"]) -> None: ...
103-
def WhichOneof(self, oneof_group: typing_extensions.Literal["progress", b"progress"]) -> typing_extensions.Literal["status", "numerical"] | None: ...
101+
def HasField(self, field_name: typing_extensions.Literal["progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> builtins.bool: ...
102+
def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> None: ...
103+
def WhichOneof(self, oneof_group: typing_extensions.Literal["progress_type", b"progress_type"]) -> typing_extensions.Literal["status", "progress"] | None: ...
104104

105105
global___TaskProgressUpdate = TaskProgressUpdate

src/omotes_sdk/internal/worker/worker.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,25 @@ def __init__(self, job_id: UUID, task: CeleryTask, broker_if: BrokerInterface):
4545
self.task = task
4646
self.broker_if = broker_if
4747

48+
def send_start(self) -> None:
49+
"""Send a START progress update to the orchestrator."""
50+
logger.debug(
51+
"Sending START progress update for job %s (celery id %s)",
52+
self.job_id,
53+
self.task.request.id,
54+
)
55+
self.broker_if.send_message_to(
56+
None,
57+
WORKER.config.task_progress_queue_name,
58+
TaskProgressUpdate(
59+
job_id=str(self.job_id),
60+
celery_task_id=self.task.request.id,
61+
celery_task_type=WORKER_TASK_TYPE,
62+
status=TaskProgressUpdate.START,
63+
message="Started job at worker.",
64+
).SerializeToString(),
65+
)
66+
4867
def update_progress(self, fraction: float, message: str) -> None:
4968
"""Send a progress update to the orchestrator.
5069
@@ -243,7 +262,7 @@ def wrapped_worker_task(
243262
"""
244263
logger.info("Worker started new task %s with reference %s", job_id, job_reference)
245264
task_util = TaskUtil(job_id, task, task.broker_if)
246-
task_util.update_progress(0, "Job calculation started")
265+
task_util.send_start()
247266
output_esdl = WORKER_TASK_FUNCTION(input_esdl, params_dict, task_util.update_progress)
248267

249268
input_esh = pyesdl_from_string(input_esdl)

task_messages_protocol/task.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ message TaskProgressUpdate {
1818
string job_id = 1;
1919
string celery_task_id = 2;
2020
string celery_task_type = 5;
21-
oneof progress {
21+
oneof progress_type {
2222
ProgressStatus status = 6;
23-
double numerical = 7;
23+
double progress = 7;
2424
}
2525
string message = 4;
2626

0 commit comments

Comments
 (0)