[Serve][Deployment Graph][Perf] Add minimal executor DAGNode#24754
[Serve][Deployment Graph][Perf] Add minimal executor DAGNode#24754edoakes merged 21 commits intoray-project:masterfrom
Conversation
|
Tests passed for me on Python 3.7, i think CI tests are Python 3.6 only, will take a closer look tomorrow. |
edoakes
left a comment
There was a problem hiding this comment.
@jiaodong the high-level approach makes sense to me (having separate runtime implementations for execution), but I'm a bit confused by the scope of what these node types are doing. It seems like there should only be a very minimal path that we can take in execution (essentially only calling function nodes or deployment method nodes) -- what do we need the deployment executor node for?
| from ray.serve.pipeline.deployment_node import DeploymentNode | ||
| from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode | ||
| from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode | ||
| from ray.serve.deployment_executor_node import DeploymentExecutorNode | ||
| from ray.serve.deployment_method_executor_node import ( | ||
| DeploymentMethodExecutorNode, | ||
| ) | ||
| from ray.serve.deployment_function_executor_node import ( | ||
| DeploymentFunctionExecutorNode, | ||
| ) |
There was a problem hiding this comment.
seems bad to me that we need to register these in the core DAG code... if external parties wanted to add a DAG implementation, would they also need to do this? Is there a way we can define an interface to register these instead of hard-coding them? cc @ericl if you have thoughts
| - Meanwhile, __init__, _copy_impl and _execute_impl are on the critical | ||
| pass of execution for every request. |
There was a problem hiding this comment.
why is _copy_impl on the critical path for execution?
Also, as an aside these should not be underscore-prefixed if they're part of the public interface we need to implement to make a functional dag node.
There was a problem hiding this comment.
This is because upon executing the dag, we need to recursively find and replace nodes to corresponding _execute_impl output value. Then in https://sourcegraph.com/github.com/ray-project/ray/-/blob/python/ray/experimental/dag/dag_node.py?L174 we're returning a new instance with replaced args / kwargs.
There was a problem hiding this comment.
The first part of what you said makes sense, but the second part I don't quite understand. Shouldn't we only need to call _execute_impl in the execution path? Why do we need to "return a new instance with replaced args/kwargs"?
| def _execute_impl(self, *args, **kwargs): | ||
| """Does not call into anything or produce a new value, as the time | ||
| this function gets called, all child nodes are already resolved to | ||
| ObjectRefs. | ||
| """ | ||
| return self._deployment_handle |
There was a problem hiding this comment.
I'm confused at what _execute_impl does... should this just be calling the deployment handle? I thought the contract of _execute_impl was that it always returns an ObjectRef?
There was a problem hiding this comment.
Actually, is it even valid for this to be executed directly? I think we should only have deployment method nodes at time of execution, right?
There was a problem hiding this comment.
We need to have a node that captures class / deployment creation. For Ray Core DAG, this is creating a new actor; For Serve's DeploymentNode, we deferred the work to subsequent Deployment objects to be deployed in serve.run().
So in the minimal executor dag version of DeploymentNode, it should do literally nothing but returning the handle, but it's also necessary because we need to support user passing DeploymentNode around into other func or classes' constructor.
So a simpler way to understand this function is something like
@serve.deployment
class Combine:
def __init__(self, model):
self.model = model
Where the model here is a DeploymentNode, and we swapped it with a DeploymentExecutorNode that upon execution returns the handle to model for init args.
There was a problem hiding this comment.
Why is this "upon execution?" Shouldn't this just be in the initial transformation step where we go from DeploymentNode -> Handle? Not understanding why the intermediate transformation to DeploymentExecutorNode is necessary.
There was a problem hiding this comment.
My mental model of how DAG execution work is just to start from the output node, and recursively call the execute_impl of its dependencies, and replace the dependencies with the result of the call.
for example, MethodNode's first calls its parent DeployeNode exec_impl, which returns the handle, and then the MethodNode's parent_node becomes the handle object, so the method node's impl can be something like self.parent_node.method_name.remote().
| new_kwargs, | ||
| ) | ||
|
|
||
| def _execute_impl(self, *args, **kwargs): |
| new_kwargs: Dict[str, Any], | ||
| new_options: Dict[str, Any], | ||
| new_other_args_to_resolve: Dict[str, Any], | ||
| ): |
| self._deployment_handle = deployment_handle | ||
| super().__init__(dag_args, dag_kwargs, {}, {}) | ||
|
|
||
| def _copy_impl( |
There was a problem hiding this comment.
what is _copy_impl used for?
There was a problem hiding this comment.
| method_body = getattr( | ||
| self._deployment_node_replaced_by_handle, self._deployment_method_name | ||
| ) |
There was a problem hiding this comment.
why not put this in the constructor?
There was a problem hiding this comment.
updated -- the value of self._deployment_node_replaced_by_handle is not a deployment handle yet until execution time, therefore getting the deployment method handle here won't work at constructor. From benchmark it makes negligible perf difference.
| def __getattr__(self, method_name: str): | ||
| return DeploymentMethodExecutorNode( | ||
| method_name, | ||
| (), | ||
| {}, | ||
| other_args_to_resolve={ | ||
| PARENT_CLASS_NODE_KEY: self, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
is this valid? I don't think we should ever be calling getattr on this at execution time, right? It should already be replaced with a deployment method node.
There was a problem hiding this comment.
This method is actually only called at our own transformation time because we need to interpret deployment.method nodes, at execution time the only method matters is _execute_impl and _copy_impl
There was a problem hiding this comment.
I do think this PR reveals a bigger issue. We don't have an architecture/design doc for how Ray DAG works, what's all about _execute_impl, _copy_impl, _other_args_to_resolve... this is making people contribute and review relevant doc.
simon-mo
left a comment
There was a problem hiding this comment.
new code looks fine, more questions about delete old ones
| @@ -67,6 +69,19 @@ def build(ray_dag_root_node: DAGNode) -> List[Deployment]: | |||
| lambda node: transform_ray_dag_to_serve_dag(node, node_name_generator) | |||
There was a problem hiding this comment.
now i'm wondering, can we directly go from ClassNode/FunctionNode to these ExecutorNode? The transform process can spit out:
- A list of deployment objects
- A minimal dag for execution
So we can delete all the existing Deployment{, Function, Method}Node implementations
There was a problem hiding this comment.
yes this is exactly my thinking. I think we only need to do the following:
- Collect list of deployments that need to be deployed (w/ args, options).
- Transform ClassNode -> ServeHandle.
- Transform {ClassMethodNode, FunctionNode} -> DAGExecutorNode.
There was a problem hiding this comment.
we eventually should be able to, and in April i spent about a week trying it but it has a bit more magic regarding how we deal with deployment and dag handles as deployment arg in our 1.12 sprint .. so this PR is more about adding the final ideal state with perf wins, then we can work backwards to simplify what happened in first pass.
|
The ideal case is to split up responsibilities of first pass from ray_dag -> serve_dag. We need to use DAG traversal mechanism to do both 1) Map Ray DAGNode types to Serve DAGNode types 2) Generate This part is currently messy as it mixed both things in one pass, under one node type, so slowing down execution is the obvious user facing issue but clear abstraction is the other missing piece. Ideally I think it should be structure as:
We have 1-3 working now, this PR adds 4) to optimize user facing perf. But 2) & 3) has tech debt that we can split into separate steps and remove unnecessary code. |
|
Per offline conversation, this PR can be merged as long as test passing and small nits resolved.
|
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.skip(reason="async handle not enabled yet") |
There was a problem hiding this comment.
I thought async handle needs to be implemented for performance?
There was a problem hiding this comment.
yes but not in this PR yet
|
|
||
|
|
||
| def test_deploment_options_func_class_with_class_method(): | ||
| def test_deploment_options_func_class_with_class_method(serve_instance): |
There was a problem hiding this comment.
this shouldn't be required if we the minimal executor node uses lazy handle.
There was a problem hiding this comment.
oh there's one committed that i removed lazy handle but later realized we still need it to run dag without serve instance. let me change this back.
simon-mo
left a comment
There was a problem hiding this comment.
LGTM.
Please make sure to follow up with
- Async handle changes
- Cleanup changes
|
I'm ok with merging this for now, let's sync up about the end state we want to work towards -- I think things are much more complex than they need to be right now. |
|
sg, let me put up an internal onboarding doc style for DAG stuff in general, and we can see with more clarity how we map ray nodes to deployment graph |
Why are these changes needed?
closes #24475
Current deployment graph has big perf issues compare with using plain deployment handle, mostly because overhead of DAGNode traversal mechanism. We need this mechanism to empower DAG API, specially deeply nested objects in args where we rely on pickling; But meanwhile the nature of each execution becomes re-creating and replacing every
DAGNodeinstances involved upon each execution, that incurs overhead.Some overhead is inevitable due to pickling and executing DAGNode python code, but they could be quite minimal. As I profiled earlier, pickling itself is quite fast for our benchmarks at magnitude of microseconds.
Meanwhile the elephant in the room is DeploymentNode and its relatives are doing too much work in constructor that's beyond necessary, thus slowing everything down. So the fix is as simple as
Current ray dag -> serve dag mixed a lot of stuff related to deployment generation and init args, in longer term we should remove them but our correctness depends on it so i rather leave it as separate PR.
Current 10 node chain with deployment graph
.bind()Using raw deployment handle without dag overhead
After this PR:
Related issue number
Checks
scripts/format.shto lint the changes in this PR.