Skip to content

Conversation

@crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Apr 8, 2022

@crusaderky crusaderky requested a review from fjetter April 8, 2022 12:58
@crusaderky crusaderky self-assigned this Apr 8, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Apr 8, 2022

Unit Test Results

       16 files  ±  0         16 suites  ±0   7h 12m 23s ⏱️ - 26m 48s
  2 734 tests +  4    2 653 ✔️ +  5       80 💤  - 1  0  - 1  1 🔥 +1 
21 758 runs  +33  20 685 ✔️ +40  1 072 💤  - 7  0  - 1  1 🔥 +1 

For more details on these errors, see this check.

Results for commit 34274b7. ± Comparison against base commit bd3f47e.

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member

@sjperkins can I ask you to review this?

@sjperkins sjperkins self-requested a review April 12, 2022 10:35
Copy link
Member

@sjperkins sjperkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds logging of state machine events to the Worker. Modified StateMachineEvents are added to a new Worker.stimulus_log attribute. StateMachineEvents can be converted to dictionaries and partly reconstructed from them. This is to support replay from logs discussed for e.g. here #5736 (comment).

I think StateMachineEvent.log could be renamed to something more descriptive

def __init_subclass__(cls):
StateMachineEvent._classes[cls.__name__] = cls

def log(self, *, handled: float) -> StateMachineEvent:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be named something more descriptive. How about one of the following?

  • logabble_event?
  • to_loggable_event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

stimulus_id: str
#: timestamp of when the event was handled by the worker
# TODO switch to @dataclass(slots=True) and uncomment (requires Python >=3.10)
# handled: float | None = field(init=False, default=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the reason for the new method containing the self.handled = None assignment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. clarified in comment

self.handled = handled
return self

def _to_dict(self, *, exclude: Container[str] = ()) -> dict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dictionary conversion seems necessary because stimulus_log: StateMachineEvent has been added to Worker and thus must be supported by Worker._to_dict


prev_handled = story[0].handled
for ev in story[1:]:
assert ev.handled > prev_handled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always assume that this invariant holds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a very tiny chance of getting two events in the same nanosecond. changed to >=.

def log(self, *, handled: float) -> StateMachineEvent:
out = copy(self)
out.handled = handled
out.value = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the execution result is discarded because of the potentially large size of the result, and possibly the complexity of serialising/deserialising the result?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not discarding it would cause worker.stimulus_log to become effecitvely a copy of worker.data, except that it never loses any data!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed!


def _after_from_dict(self) -> None:
self.value = None
self.type = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the execution result type is discarded here because it's merely a string representation at this point and one would have to deal with serialising/unserialising types.

In any case, I think reconstructing the result of execution is non-trivial. How does this impact replayability of events on the Worker (out of interest?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these fields that are being discarded on a serialization round-trip should be inconsequential for the purpose of rebuilding the state.

@crusaderky
Copy link
Collaborator Author

All review comments have been addressed

@mrocklin mrocklin merged commit 6a3cbd3 into dask:main Apr 13, 2022
@mrocklin
Copy link
Member

Thank you for the work @crusaderky and for the review @sjperkins

@crusaderky crusaderky deleted the WMSM/log_events branch April 13, 2022 12:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Migrate ensure_computing transitions to new WorkerState event mechanism

3 participants