[workflow] Fast workflow indexing#24767
Conversation
|
Sorry for the late review and thanks for working on this! Overall, I'm good about the protocol. My concern is that this protocol enforces readers to be single thread as well which seems not to work in the current system. One way is to ensure make read work without fixing the half updates and we only fix this when there is a write. |
|
let me try to implement another protocol using key creation time. I think it might address the multi-threading issues. |
|
@iycheng ready for reviewing again |
| def cancel(workflow_id: str) -> None: | ||
| try: | ||
| workflow_manager = get_management_actor() | ||
| ray.get(workflow_manager.cancel_workflow.remote(workflow_id)) | ||
| except ValueError: | ||
| wf_store = workflow_storage.get_workflow_storage(workflow_id) | ||
| wf_store.save_workflow_meta(WorkflowMetaData(WorkflowStatus.CANCELED)) | ||
| # TODO(suquark): Here we update workflow status "offline", so it is likely | ||
| # thread-safe because there is no workflow management actor updating the | ||
| # workflow concurrently. But we should be careful if we are going to | ||
| # update more workflow status offline in the future. | ||
| wf_store.update_workflow_status(WorkflowStatus.CANCELED) | ||
| return | ||
| ray.get(workflow_manager.cancel_workflow.remote(workflow_id)) |
There was a problem hiding this comment.
Should we move ray.get back to try block?
There was a problem hiding this comment.
no, I think we put ray.get in try block accidentally. ray.get cannot generate ValueError, only get_management_actor generates ValueError
| return WorkflowStatus(metadata["status"]) | ||
| return WorkflowStatus.NONE | ||
|
|
||
| def list_workflow(self) -> List[Tuple[str, WorkflowStatus]]: |
There was a problem hiding this comment.
I think we need an extra param here list_workflow(self, status=None)
Here if status is set, we'll only check the dirty directory and the specified status directory. I'm ok with another PR to fix this
| if s != prev_status: | ||
| self._storage.delete( | ||
| self._key_workflow_with_status(workflow_id, s) | ||
| ) |
There was a problem hiding this comment.
No need to update this PR. But do you think in the future we can put the status in the dirty flag and only delete that?
|
It seems there is a bug here: we should always set up the flag I think. Another thing is that, I think we need to put status filter into storage layer so that we don't need to read successful workflow status which is not useful for resume_all/list_all with filter. I'm OK with this PR and having another one for this optimization. |
|
@iycheng I think we did not set the dirty flag because in that branch we already detected the dirty flag. Since the workflow status updating is single threaded, there is no need to create it again. (Create it again also does not work under concurrent case, because another faster process could delete the newly created flag anyway - the order of create/delete of different processes could be arbitrary). |
|
Got it and thanks for the explaination. |
|
@iycheng I just updated the PR and support status filter. I also fixed a bug: in the original |
4fca233 to
7171aa1
Compare
|
CI failures seem unrelated. I'll merge this PR. |
Why are these changes needed?
This PR enables indexing for workflow status. So it would be much faster to list workflows and status.
The indexing is done by creating keys under corresponding status directories. For example,
RUNNINGdirectory contains all keys (named with workflow ids), which the corresponding workflow is running.One issue is that the cluster / workflow maybe crashed while updating the status, this would result in inconsistency status, because we have to create the new key, delete the old key and update workflow metadata, these actions cannot be combined as a single atomic operation. We use a special directory marking the status updating is underway. This makes us possible to detect unfinished status updating and fixing them. (See examples in newly added tests).
Checks
scripts/format.shto lint the changes in this PR.