diff --git a/benchmarks/benchmark_tests.yaml b/benchmarks/benchmark_tests.yaml index 016f54c32f4c..014cbc53dd8f 100644 --- a/benchmarks/benchmark_tests.yaml +++ b/benchmarks/benchmark_tests.yaml @@ -36,6 +36,6 @@ compute_template: distributed.yaml run: - timeout: 3600 # 1hr + timeout: 7200 # 2hr prepare: sleep 0 script: python distributed/test_distributed.py diff --git a/benchmarks/distributed/config.yaml b/benchmarks/distributed/config.yaml index 8ccff5c4ee28..88cbdbed91b6 100644 --- a/benchmarks/distributed/config.yaml +++ b/benchmarks/distributed/config.yaml @@ -38,6 +38,15 @@ available_node_types: resources: node: 1 max_workers: 999999 + tiny_worker_node: + node_config: + InstanceType: t3.large + ImageId: ami-0a2363a9cff180a64 + resources: + node: 1 + tiny: 1 + CPU: 5 + max_workers: 999999 head_node_type: head_node diff --git a/benchmarks/distributed/test_actors.py b/benchmarks/distributed/test_actors.py new file mode 100644 index 000000000000..e1ae3e32112d --- /dev/null +++ b/benchmarks/distributed/test_actors.py @@ -0,0 +1,73 @@ +import ray +import ray.autoscaler.sdk + +import json +import os +from time import sleep, perf_counter +from tqdm import tqdm, trange + +TEST_NUM_NODES = 1000 +MAX_ACTORS_IN_CLUSTER = 20000 + + +def num_alive_nodes(): + n = 0 + for node in ray.nodes(): + if node["Alive"]: + n += 1 + return n + + +def scale_to(target): + while num_alive_nodes() != target: + ray.autoscaler.sdk.request_resources(bundles=[{"node": 1}] * target) + print(f"Current # nodes: {num_alive_nodes()}, target: {target}") + print("Waiting ...") + sleep(5) + + +def test_max_actors(): + # TODO (Alex): Dynamically set this based on number of cores + cpus_per_actor = 0.24 + + @ray.remote(num_cpus=cpus_per_actor) + class Actor: + def foo(self): + pass + + actors = [ + Actor.options(num_cpus=0.24).remote() + for _ in trange(MAX_ACTORS_IN_CLUSTER, desc="Launching actors") + ] + + for actor in tqdm(actors, desc="Ensuring actors have started"): + assert ray.get(actor.foo.remote()) is None + + +ray.init(address="auto") + +scale_to(TEST_NUM_NODES) +assert num_alive_nodes( +) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) + +cluster_resources = ray.cluster_resources() + +available_resources = ray.available_resources() +assert available_resources == cluster_resources, ( + str(available_resources) + " != " + str(cluster_resources)) + +actor_start = perf_counter() +test_max_actors() +actor_end = perf_counter() +actor_time = actor_end - actor_start + +print(f"Actor time: {actor_time} ({MAX_ACTORS_IN_CLUSTER} actors)") + +if "TEST_OUTPUT_JSON" in os.environ: + out_file = open(os.environ["TEST_OUTPUT_JSON"], "w") + results = { + "actor_time": actor_time, + "num_actors": MAX_ACTORS_IN_CLUSTER, + "success": "1" + } + json.dump(results, out_file) diff --git a/benchmarks/distributed/test_distributed.py b/benchmarks/distributed/test_distributed.py index 4fd539175804..7ea5f45327a0 100644 --- a/benchmarks/distributed/test_distributed.py +++ b/benchmarks/distributed/test_distributed.py @@ -9,10 +9,10 @@ from tqdm import tqdm, trange TEST_NUM_NODES = 65 -MAX_ACTORS_IN_CLUSTER = 10000 MAX_RUNNING_TASKS_IN_CLUSTER = 10000 MAX_PLACEMENT_GROUPS = 1000 -MAX_NUM_NODES = 250 +MAX_ACTORS_IN_CLUSTER = 50000 +MAX_NUM_NODES = 1000 def num_alive_nodes(): @@ -144,7 +144,7 @@ def f3(): scale_to(TEST_NUM_NODES) assert num_alive_nodes( -) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +) == TEST_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}" cluster_resources = ray.cluster_resources() @@ -153,24 +153,13 @@ def f3(): str(available_resources) + " != " + str(cluster_resources)) print("Done launching nodes") -actor_start = perf_counter() -test_max_actors() -actor_end = perf_counter() - -sleep(1) -assert num_alive_nodes( -) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) -assert available_resources == cluster_resources, ( - str(available_resources) + " != " + str(cluster_resources)) -print("Done testing actors") - task_start = perf_counter() test_max_running_tasks() task_end = perf_counter() sleep(1) assert num_alive_nodes( -) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +) == TEST_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}" assert available_resources == cluster_resources, ( str(available_resources) + " != " + str(cluster_resources)) print("Done testing tasks") @@ -181,7 +170,7 @@ def f3(): sleep(1) assert num_alive_nodes( -) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +) == TEST_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}" assert available_resources == cluster_resources, ( str(available_resources) + " != " + str(cluster_resources)) print("Done testing placement groups") @@ -192,7 +181,16 @@ def f3(): sleep(1) assert num_alive_nodes( -) == MAX_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +) == MAX_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}" + +actor_start = perf_counter() +test_max_actors() +actor_end = perf_counter() + +sleep(1) +assert num_alive_nodes( +) == MAX_NUM_NODES, f"Wrong number of nodes in cluster {len(ray.nodes())}" +print("Done testing actors") print("Done.") actor_time = actor_end - actor_start