Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def _node_type_from_group_spec(

resources = _get_ray_resources_from_group_spec(group_spec, is_head)

return {
node_type = {
"min_workers": min_workers,
"max_workers": max_workers,
# `node_config` is a legacy field required for compatibility.
Expand All @@ -228,6 +228,12 @@ def _node_type_from_group_spec(
"resources": resources,
}

idle_timeout_s = group_spec.get(IDLE_SECONDS_KEY)
if idle_timeout_s is not None:
node_type["idle_timeout_s"] = float(idle_timeout_s)

return node_type


def _get_ray_resources_from_group_spec(
group_spec: Dict[str, Any], is_head: bool
Expand Down
1 change: 1 addition & 0 deletions python/ray/autoscaler/ray-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@
},
"min_workers": {"type": "integer"},
"max_workers": {"type": "integer"},
"idle_timeout_s": {"type": "number", "nullable": true},
"resources": {
"type": "object",
"patternProperties": {
Expand Down
3 changes: 3 additions & 0 deletions python/ray/autoscaler/v2/instance_manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class NodeTypeConfig:
min_worker_nodes: int
# The maximal number of worker nodes can be launched for this node type.
max_worker_nodes: int
# Idle timeout seconds for worker nodes of this node type.
idle_timeout_s: Optional[float] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we enforce it as integer with a cast when we add this? I see it being int as part of the schema

Or we could make this a float in the schema too. No preference over this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change it to a number type in the schema and then add a cast to float when we call idle_timeout_s = group_spec.get(IDLE_SECONDS_KEY), since I implemented it as an int in the RayCluster CRD for consistency with the other field: https://github.com/ray-project/kuberay/blob/925effe34022c72c41691c0b79d8d3051d4a1b77/ray-operator/apis/ray/v1/raycluster_types.go#L94

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the tests again and implemented this change in: 1bd8afb

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks for the great work!

# The total resources on the node.
resources: Dict[str, float] = field(default_factory=dict)
# The labels on the node.
Expand Down Expand Up @@ -346,6 +348,7 @@ def get_node_type_configs(self) -> Dict[NodeType, NodeTypeConfig]:
name=node_type,
min_worker_nodes=node_config.get("min_workers", 0),
max_worker_nodes=max_workers_nodes,
idle_timeout_s=node_config.get("idle_timeout_s", None),
resources=node_config.get("resources", {}),
labels=node_config.get("labels", {}),
launch_config_hash=launch_config_hash,
Expand Down
6 changes: 5 additions & 1 deletion python/ray/autoscaler/v2/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,11 @@ def _enforce_idle_termination(
continue

idle_timeout_s = ctx.get_idle_timeout_s()
# Override the scheduler idle_timeout_s if set for this node_type.
node_type = node.node_type
if node_type in node_type_configs:
if node_type_configs[node_type].idle_timeout_s is not None:
idle_timeout_s = node_type_configs[node_type].idle_timeout_s
if idle_timeout_s is None:
# No idle timeout is set, skip the idle termination.
continue
Expand All @@ -1606,7 +1611,6 @@ def _enforce_idle_termination(

# Honor the min_worker_nodes setting for the node type.
min_count = 0
node_type = node.node_type
if node_type in node_type_configs:
min_count = node_type_configs[node_type].min_worker_nodes
if (
Expand Down
76 changes: 76 additions & 0 deletions python/ray/autoscaler/v2/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,82 @@ def test_idle_termination_with_min_worker(min_workers):
assert len(to_terminate) == 0


@pytest.mark.parametrize("node_type_idle_timeout_s", [1, 2, 10])
def test_idle_termination_with_node_type_idle_timeout(node_type_idle_timeout_s):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

"""
Test that idle nodes are terminated when idle_timeout_s is set for node type.
"""
scheduler = ResourceDemandScheduler(event_logger)

node_type_configs = {
"type_cpu_with_idle_timeout": NodeTypeConfig(
name="type_cpu",
resources={"CPU": 1},
min_worker_nodes=0,
max_worker_nodes=5,
idle_timeout_s=node_type_idle_timeout_s,
launch_config_hash="hash1",
),
}

idle_time_s = 5
constraints = []

request = sched_request(
node_type_configs=node_type_configs,
instances=[
make_autoscaler_instance(
im_instance=Instance(
instance_type="type_cpu_with_idle_timeout",
status=Instance.RAY_RUNNING,
launch_config_hash="hash1",
instance_id="i-1",
node_id="r-1",
),
ray_node=NodeState(
node_id=b"r-1",
ray_node_type_name="type_cpu_with_idle_timeout",
available_resources={"CPU": 0},
total_resources={"CPU": 1},
idle_duration_ms=0, # Non idle
status=NodeStatus.RUNNING,
),
cloud_instance_id="c-1",
),
make_autoscaler_instance(
im_instance=Instance(
instance_id="i-2",
instance_type="type_cpu_with_idle_timeout",
status=Instance.RAY_RUNNING,
launch_config_hash="hash1",
node_id="r-2",
),
ray_node=NodeState(
ray_node_type_name="type_cpu_with_idle_timeout",
node_id=b"r-2",
available_resources={"CPU": 1},
total_resources={"CPU": 1},
idle_duration_ms=idle_time_s * 1000,
status=NodeStatus.IDLE,
),
cloud_instance_id="c-2",
),
],
# Set autoscaler idle_timeout_s to a value greater than
# node_type_idle_timeout_s and idle_time_s.
idle_timeout_s=idle_time_s * 1000,
cluster_resource_constraints=constraints,
)

reply = scheduler.schedule(request)
_, to_terminate = _launch_and_terminate(reply)
if node_type_idle_timeout_s <= idle_time_s:
assert len(to_terminate) == 1
assert to_terminate == [("i-2", "r-2", TerminationRequest.Cause.IDLE)]
else:
assert len(to_terminate) == 0


def test_gang_scheduling():
"""
Test that gang scheduling works.
Expand Down