Skip to content

Commit dd95ec2

Browse files
Merge pull request #85 from Project-OMOTES/84-fix-issue-where-worker-may-also-send-out-progressupdate-00-which-has-special-meaning-for-orchestrator-as-it-may-trigger-a-cancellation-on-the-job-due-to-delivery-limit
84: Create proposal to include specific START progress update from wo…
2 parents f0ea927 + 4e2d6c7 commit dd95ec2

File tree

4 files changed

+77
-15
lines changed

4 files changed

+77
-15
lines changed

src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py

Lines changed: 32 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,27 @@ global___TaskResult = TaskResult
6565
class TaskProgressUpdate(google.protobuf.message.Message):
6666
DESCRIPTOR: google.protobuf.descriptor.Descriptor
6767

68+
class _ProgressStatus:
69+
ValueType = typing.NewType("ValueType", builtins.int)
70+
V: typing_extensions.TypeAlias = ValueType
71+
72+
class _ProgressStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TaskProgressUpdate._ProgressStatus.ValueType], builtins.type):
73+
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
74+
START: TaskProgressUpdate._ProgressStatus.ValueType # 0
75+
76+
class ProgressStatus(_ProgressStatus, metaclass=_ProgressStatusEnumTypeWrapper): ...
77+
START: TaskProgressUpdate.ProgressStatus.ValueType # 0
78+
6879
JOB_ID_FIELD_NUMBER: builtins.int
6980
CELERY_TASK_ID_FIELD_NUMBER: builtins.int
7081
CELERY_TASK_TYPE_FIELD_NUMBER: builtins.int
82+
STATUS_FIELD_NUMBER: builtins.int
7183
PROGRESS_FIELD_NUMBER: builtins.int
7284
MESSAGE_FIELD_NUMBER: builtins.int
7385
job_id: builtins.str
7486
celery_task_id: builtins.str
7587
celery_task_type: builtins.str
88+
status: global___TaskProgressUpdate.ProgressStatus.ValueType
7689
progress: builtins.float
7790
message: builtins.str
7891
def __init__(
@@ -81,9 +94,12 @@ class TaskProgressUpdate(google.protobuf.message.Message):
8194
job_id: builtins.str = ...,
8295
celery_task_id: builtins.str = ...,
8396
celery_task_type: builtins.str = ...,
97+
status: global___TaskProgressUpdate.ProgressStatus.ValueType = ...,
8498
progress: builtins.float = ...,
8599
message: builtins.str = ...,
86100
) -> None: ...
87-
def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress"]) -> None: ...
101+
def HasField(self, field_name: typing_extensions.Literal["progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> builtins.bool: ...
102+
def ClearField(self, field_name: typing_extensions.Literal["celery_task_id", b"celery_task_id", "celery_task_type", b"celery_task_type", "job_id", b"job_id", "message", b"message", "progress", b"progress", "progress_type", b"progress_type", "status", b"status"]) -> None: ...
103+
def WhichOneof(self, oneof_group: typing_extensions.Literal["progress_type", b"progress_type"]) -> typing_extensions.Literal["status", "progress"] | None: ...
88104

89105
global___TaskProgressUpdate = TaskProgressUpdate

src/omotes_sdk/internal/worker/worker.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,25 @@ def __init__(self, job_id: UUID, task: CeleryTask, broker_if: BrokerInterface):
4545
self.task = task
4646
self.broker_if = broker_if
4747

48+
def send_start(self) -> None:
49+
"""Send a START progress update to the orchestrator."""
50+
logger.debug(
51+
"Sending START progress update for job %s (celery id %s)",
52+
self.job_id,
53+
self.task.request.id,
54+
)
55+
self.broker_if.send_message_to(
56+
None,
57+
WORKER.config.task_progress_queue_name,
58+
TaskProgressUpdate(
59+
job_id=str(self.job_id),
60+
celery_task_id=self.task.request.id,
61+
celery_task_type=WORKER_TASK_TYPE,
62+
status=TaskProgressUpdate.START,
63+
message="Started job at worker.",
64+
).SerializeToString(),
65+
)
66+
4867
def update_progress(self, fraction: float, message: str) -> None:
4968
"""Send a progress update to the orchestrator.
5069
@@ -243,7 +262,7 @@ def wrapped_worker_task(
243262
"""
244263
logger.info("Worker started new task %s with reference %s", job_id, job_reference)
245264
task_util = TaskUtil(job_id, task, task.broker_if)
246-
task_util.update_progress(0, "Job calculation started")
265+
task_util.send_start()
247266
output_esdl = WORKER_TASK_FUNCTION(input_esdl, params_dict, task_util.update_progress)
248267

249268
input_esh = pyesdl_from_string(input_esdl)

task_messages_protocol/task.proto

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ message TaskProgressUpdate {
1818
string job_id = 1;
1919
string celery_task_id = 2;
2020
string celery_task_type = 5;
21-
double progress = 3;
21+
oneof progress_type {
22+
ProgressStatus status = 6;
23+
double progress = 7;
24+
}
2225
string message = 4;
26+
27+
enum ProgressStatus {
28+
START = 0;
29+
}
2330
}

0 commit comments

Comments
 (0)