Skip to content
Closed

Test #38

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/benchmark_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
compute_template: distributed.yaml

run:
timeout: 3600 # 1hr
timeout: 7200 # 2hr
prepare: sleep 0
script: python distributed/test_distributed.py
9 changes: 9 additions & 0 deletions benchmarks/distributed/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ available_node_types:
resources:
node: 1
max_workers: 999999
tiny_worker_node:
node_config:
InstanceType: t3.large
ImageId: ami-0a2363a9cff180a64
resources:
node: 1
tiny: 1
CPU: 5
max_workers: 999999

head_node_type: head_node

Expand Down
73 changes: 73 additions & 0 deletions benchmarks/distributed/test_actors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import ray
import ray.autoscaler.sdk

import json
import os
from time import sleep, perf_counter
from tqdm import tqdm, trange

TEST_NUM_NODES = 1000
MAX_ACTORS_IN_CLUSTER = 20000


def num_alive_nodes():
n = 0
for node in ray.nodes():
if node["Alive"]:
n += 1
return n


def scale_to(target):
while num_alive_nodes() != target:
ray.autoscaler.sdk.request_resources(bundles=[{"node": 1}] * target)
print(f"Current # nodes: {num_alive_nodes()}, target: {target}")
print("Waiting ...")
sleep(5)


def test_max_actors():
# TODO (Alex): Dynamically set this based on number of cores
cpus_per_actor = 0.24

@ray.remote(num_cpus=cpus_per_actor)
class Actor:
def foo(self):
pass

actors = [
Actor.options(num_cpus=0.24).remote()
for _ in trange(MAX_ACTORS_IN_CLUSTER, desc="Launching actors")
]

for actor in tqdm(actors, desc="Ensuring actors have started"):
assert ray.get(actor.foo.remote()) is None


ray.init(address="auto")

scale_to(TEST_NUM_NODES)
assert num_alive_nodes(
) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes())

cluster_resources = ray.cluster_resources()

available_resources = ray.available_resources()
assert available_resources == cluster_resources, (
str(available_resources) + " != " + str(cluster_resources))

actor_start = perf_counter()
test_max_actors()
actor_end = perf_counter()
actor_time = actor_end - actor_start

print(f"Actor time: {actor_time} ({MAX_ACTORS_IN_CLUSTER} actors)")

if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
results = {
"actor_time": actor_time,
"num_actors": MAX_ACTORS_IN_CLUSTER,
"success": "1"
}
json.dump(results, out_file)
32 changes: 15 additions & 17 deletions benchmarks/distributed/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from tqdm import tqdm, trange

TEST_NUM_NODES = 65
MAX_ACTORS_IN_CLUSTER = 10000
MAX_RUNNING_TASKS_IN_CLUSTER = 10000
MAX_PLACEMENT_GROUPS = 1000
MAX_NUM_NODES = 250
MAX_ACTORS_IN_CLUSTER = 50000
MAX_NUM_NODES = 1000


def num_alive_nodes():
Expand Down Expand Up @@ -144,7 +144,7 @@ def f3():

scale_to(TEST_NUM_NODES)
assert num_alive_nodes(
) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes())
) == TEST_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}"

cluster_resources = ray.cluster_resources()

Expand All @@ -153,24 +153,13 @@ def f3():
str(available_resources) + " != " + str(cluster_resources))
print("Done launching nodes")

actor_start = perf_counter()
test_max_actors()
actor_end = perf_counter()

sleep(1)
assert num_alive_nodes(
) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes())
assert available_resources == cluster_resources, (
str(available_resources) + " != " + str(cluster_resources))
print("Done testing actors")

task_start = perf_counter()
test_max_running_tasks()
task_end = perf_counter()

sleep(1)
assert num_alive_nodes(
) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes())
) == TEST_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}"
assert available_resources == cluster_resources, (
str(available_resources) + " != " + str(cluster_resources))
print("Done testing tasks")
Expand All @@ -181,7 +170,7 @@ def f3():

sleep(1)
assert num_alive_nodes(
) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes())
) == TEST_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}"
assert available_resources == cluster_resources, (
str(available_resources) + " != " + str(cluster_resources))
print("Done testing placement groups")
Expand All @@ -192,7 +181,16 @@ def f3():

sleep(1)
assert num_alive_nodes(
) == MAX_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes())
) == MAX_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}"

actor_start = perf_counter()
test_max_actors()
actor_end = perf_counter()

sleep(1)
assert num_alive_nodes(
) == MAX_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}"
print("Done testing actors")
print("Done.")

actor_time = actor_end - actor_start
Expand Down