-
Notifications
You must be signed in to change notification settings - Fork 113
Migrate scala control messages to protobuf #2950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…xera into shengquan-rpc-refactor
8d5f5eb to
9922c15
Compare
Yicong-Huang
approved these changes
Oct 30, 2024
Contributor
Yicong-Huang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Had some tests locally and left some comments in code.
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto
Outdated
Show resolved
Hide resolved
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto
Show resolved
Hide resolved
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlreturns.proto
Outdated
Show resolved
Hide resolved
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlreturns.proto
Outdated
Show resolved
Hide resolved
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/common/actormessage.proto
Show resolved
Hide resolved
core/amber/src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc/controlreturns.proto
Show resolved
Hide resolved
core/amber/src/main/python/core/architecture/handlers/actorcommand/backpressure_handler.py
Show resolved
Hide resolved
Contributor
Yicong-Huang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Had some tests locally and left some comments in code.
...src/main/scala/edu/uci/ics/amber/engine/architecture/messaginglayer/WorkerTimerService.scala
Outdated
Show resolved
Hide resolved
core/amber/src/main/python/core/architecture/handlers/control/control_handler_base.py
Show resolved
Hide resolved
...i/ics/amber/engine/architecture/controller/promisehandlers/TakeGlobalCheckpointHandler.scala
Show resolved
Hide resolved
core/amber/src/main/python/core/architecture/handlers/control/no_op_handler.py
Show resolved
Hide resolved
core/amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py
Show resolved
Hide resolved
core/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
Outdated
Show resolved
Hide resolved
core/amber/src/main/python/core/runnables/test_console_message.py
Outdated
Show resolved
Hide resolved
…xera into shengquan-rpc-refactor
shengquan-ni
added a commit
that referenced
this pull request
Nov 1, 2024
This bug is caused by a wrong import of the `RUNNING` state introduced in #2950. Co-authored-by: Xinyuan Lin <xinyual3@uci.edu>
PurelyBlank
pushed a commit
that referenced
this pull request
Dec 4, 2024
As discussed in #2950, we plan to remove obsolete RPCs and reconfiguration-related RPCs in this PR. We will bring reconfiguration back after #2950. Removed: 1. QueryCurrentInputTuple. 2. ShutdownDPThread. Disabled, will be added later: 1. Reconfiguration. 2. UpdateExecutor. 3. UpdateMultipleExecutors.
PurelyBlank
pushed a commit
that referenced
this pull request
Dec 4, 2024
This PR migrates all Scala control messages and their corresponding handlers to use gRPC.
---
### Main Changes (For both Scala and Python Engine)
1. **Rewrote `AsyncRPCServer` and `AsyncRPCClient`**
- Adjusted both to fit into the generated gRPC interface, while keeping RPCs sent and received through actor messages.
2. **Centralized RPC Definitions**
- All RPC messages and method definitions are now located in:
```
src/main/protobuf/edu/uci/ics/amber/engine/architecture/rpc
```
---
### Steps to Create New RPCs
1. **Create a request proto message** in `controlcommands.proto` with all necessary inputs.
2. **Create a response proto message** in `controlreturns.proto`.
3. **Define the RPC**:
- For **controller-handled RPCs**, add the definition to `controllerservice.proto`.
- For **worker-handled RPCs**, add it to `workerservice.proto`.
4. **Generate Scala code** by running `protocGenerate` in the SBT shell.
5. **Generate Python code** by running `./scripts/python-proto-gen.sh` from `core` folder.
6. **Implement the RPC handler**:
- Scala Engine: in either `ControllerAsyncRPCHandlerInitializer` or `WorkerAsyncRPCHandlerInitializer`, and move it to the `promisehandlers` folder for better organization.
- Python Engine: create a new handler under `python/core/architecture/handlers/control` and override corresponding `async` method for the new rpc defined in worker service.
---
### Changes in Sending RPCs (Scala Engine)
Example: Using the `StartWorkflow` RPC, which is handled by the controller.
- **Before this PR:**
```scala
send(StartWorkflow(), CONTROLLER).map { ret =>
// handle the return value
}
```
- **After this PR:**
```scala
controllerInterface.startWorkflow(EmptyRequest(), mkContext(CONTROLLER)).map { resp =>
// handle the response
}
```
Co-authored-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com>
PurelyBlank
pushed a commit
that referenced
this pull request
Dec 4, 2024
This bug is caused by a wrong import of the `RUNNING` state introduced in #2950. Co-authored-by: Xinyuan Lin <xinyual3@uci.edu>
shengquan-ni
added a commit
that referenced
this pull request
Jun 6, 2025
This PR fixes a bug where the frontend incorrectly shows an operator as still running, even though the backend execution has completed. <img width="80%" alt="截屏2025-06-05 下午3 44 23" src="https://github.com/user-attachments/assets/63865886-1b24-4f36-8ebf-155c2748f98e" /> ### Timeline for this bug This issue was originally identified and fixed in #2411. However, it re-emerged after we migrated our RPC layer to gRPC in #2950. A minor mistake during the migration caused the originally chained future to be overridden by an `EmptyResponse` as the return value. ### Root Cause The core issue is that the `controllerInitiatedQueryStats` call returns immediately instead of waiting for the worker's response. Previously, we relied on this call to collect worker stats after execution, and used the updated stats to infer workflow completion. Because the call now returns immediately, the stats are never updated. Additionally, our current design infers region completion from port completion and workflow completion from region completion. This decouples workflow execution state from the actual state of the workers. As a result, a workflow may be marked as complete even though the frontend still shows an operator as running, when in fact, the worker has already finished execution. ### Example Scenario Here’s how the issue manifests when the last operator finishes: 1. A port finishes, triggering a handler that marks the port as completed. 2. A `QueryStats` control message is sent to the worker that reported port completion (response pending). 3. We check if all ports in the region are complete—if so, the region is marked complete. 4. Separately, we receive a worker execution completion event and send another `QueryStats` message (response also pending). 5. Since all ports are marked complete, the workflow is marked complete and the entire execution is terminated. 6. Both pending `QueryStats` messages are lost because the worker is already shut down. ### The Fix The solution is straightforward: correctly chain and return the future so that the control message awaits the worker’s response before proceeding. ### How the fix is verified The bug is not always reproducible because sometimes the execution is killed after updating the states. So I looked at the WebSocket event sequence. In the correct behavior, setting the workflow state to `complete` should be after all the stats updates. Before the fix: <img width="100%" alt="截屏2025-06-05 下午3 50 34" src="https://github.com/user-attachments/assets/da996255-743d-4cda-8a07-85bd9a9d1d3d" /> After the fix: <img width="825" alt="截屏2025-06-05 下午3 52 05" src="https://github.com/user-attachments/assets/6b64660b-547f-4f19-8769-16a7f0676031" />
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR migrates all Scala control messages and their corresponding handlers to use gRPC.
Main Changes (For both Scala and Python Engine)
Rewrote
AsyncRPCServerandAsyncRPCClientCentralized RPC Definitions
Steps to Create New RPCs
controlcommands.protowith all necessary inputs.controlreturns.proto.controllerservice.proto.workerservice.proto.protocGeneratein the SBT shell../scripts/python-proto-gen.shfromcorefolder.ControllerAsyncRPCHandlerInitializerorWorkerAsyncRPCHandlerInitializer, and move it to thepromisehandlersfolder for better organization.python/core/architecture/handlers/controland override correspondingasyncmethod for the new rpc defined in worker service.Changes in Sending RPCs (Scala Engine)
Example: Using the
StartWorkflowRPC, which is handled by the controller.Before this PR:
After this PR: