From 8a055f1962afb1049cb81722c2a88ca09b11fc40 Mon Sep 17 00:00:00 2001 From: ashors1 Date: Wed, 9 Jul 2025 13:16:08 -0700 Subject: [PATCH 1/6] make mcore expandable segments configurable Signed-off-by: ashors1 --- nemo_rl/distributed/worker_groups.py | 12 +++++++++++- nemo_rl/models/policy/lm_policy.py | 3 +++ nemo_rl/models/policy/megatron_policy_worker.py | 7 ------- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/nemo_rl/distributed/worker_groups.py b/nemo_rl/distributed/worker_groups.py index b008452f1c..3d8dee5421 100644 --- a/nemo_rl/distributed/worker_groups.py +++ b/nemo_rl/distributed/worker_groups.py @@ -318,6 +318,7 @@ def __init__( bundle_indices_list: Optional[list[tuple[int, list[int]]]] = None, sharding_annotations: Optional[NamedSharding] = None, env_vars: dict[str, str] = {}, + use_expandable_segments: Optional[bool] = None, ): """Initialize a group of distributed Ray workers. @@ -392,7 +393,10 @@ def __init__( # Create workers based on the bundle_indices_list self._create_workers_from_bundle_indices( - remote_worker_builder, bundle_indices_list, env_vars=env_vars + remote_worker_builder, + bundle_indices_list, + env_vars=env_vars, + use_expandable_segments=use_expandable_segments, ) def get_dp_leader_worker_idx(self, dp_shard_idx: int) -> int: @@ -409,6 +413,7 @@ def _create_workers_from_bundle_indices( remote_worker_builder: RayWorkerBuilder, bundle_indices_list: list[tuple[int, list[int]]], env_vars: dict[str, str] = {}, + use_expandable_segments: Optional[bool] = None, ) -> None: """Create workers based on explicit bundle indices for tied worker groups. @@ -507,6 +512,11 @@ def _create_workers_from_bundle_indices( runtime_env["env_vars"]["VIRTUAL_ENV"] = py_executable runtime_env["env_vars"]["UV_PROJECT_ENVIRONMENT"] = py_executable + if use_expandable_segments is not None: + runtime_env["env_vars"]["PYTORCH_CUDA_ALLOC_CONF"] = ( + f"expandable_segments:{use_expandable_segments}" + ) + extra_options = {"runtime_env": runtime_env, "name": name} # start worker creation asynchronously diff --git a/nemo_rl/models/policy/lm_policy.py b/nemo_rl/models/policy/lm_policy.py index 22bfcd690b..19b5e217b5 100644 --- a/nemo_rl/models/policy/lm_policy.py +++ b/nemo_rl/models/policy/lm_policy.py @@ -120,6 +120,9 @@ def __init__( name_prefix=name_prefix, workers_per_node=workers_per_node, sharding_annotations=self.sharding_annotations, + use_expandable_segments=config["megatron_cfg"].get( + "use_expandable_segments", None + ), ) if config["dynamic_batching"]["enabled"]: diff --git a/nemo_rl/models/policy/megatron_policy_worker.py b/nemo_rl/models/policy/megatron_policy_worker.py index 778cb4b857..e98989a363 100644 --- a/nemo_rl/models/policy/megatron_policy_worker.py +++ b/nemo_rl/models/policy/megatron_policy_worker.py @@ -681,13 +681,6 @@ def __init__( ] ) - def configure_worker(self, num_gpus: int, bundle_indices: Optional[tuple] = None): - USE_EXPANDABLE_SEGMENTS = False # Disabling this right now as it seems to cause vLLM refit issues with Ampere - if USE_EXPANDABLE_SEGMENTS: - return None, {"PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True"}, None - else: - return None, None, None - def is_alive(self): return True From 412d08d0d1aa9148620743dbd736f7816d7decb4 Mon Sep 17 00:00:00 2001 From: ashors1 Date: Thu, 10 Jul 2025 13:41:13 -0700 Subject: [PATCH 2/6] handle env vars more generally Signed-off-by: ashors1 --- nemo_rl/distributed/worker_groups.py | 8 -------- nemo_rl/models/policy/lm_policy.py | 6 +++--- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/nemo_rl/distributed/worker_groups.py b/nemo_rl/distributed/worker_groups.py index 3d8dee5421..6ef5a24d10 100644 --- a/nemo_rl/distributed/worker_groups.py +++ b/nemo_rl/distributed/worker_groups.py @@ -318,7 +318,6 @@ def __init__( bundle_indices_list: Optional[list[tuple[int, list[int]]]] = None, sharding_annotations: Optional[NamedSharding] = None, env_vars: dict[str, str] = {}, - use_expandable_segments: Optional[bool] = None, ): """Initialize a group of distributed Ray workers. @@ -396,7 +395,6 @@ def __init__( remote_worker_builder, bundle_indices_list, env_vars=env_vars, - use_expandable_segments=use_expandable_segments, ) def get_dp_leader_worker_idx(self, dp_shard_idx: int) -> int: @@ -413,7 +411,6 @@ def _create_workers_from_bundle_indices( remote_worker_builder: RayWorkerBuilder, bundle_indices_list: list[tuple[int, list[int]]], env_vars: dict[str, str] = {}, - use_expandable_segments: Optional[bool] = None, ) -> None: """Create workers based on explicit bundle indices for tied worker groups. @@ -512,11 +509,6 @@ def _create_workers_from_bundle_indices( runtime_env["env_vars"]["VIRTUAL_ENV"] = py_executable runtime_env["env_vars"]["UV_PROJECT_ENVIRONMENT"] = py_executable - if use_expandable_segments is not None: - runtime_env["env_vars"]["PYTORCH_CUDA_ALLOC_CONF"] = ( - f"expandable_segments:{use_expandable_segments}" - ) - extra_options = {"runtime_env": runtime_env, "name": name} # start worker creation asynchronously diff --git a/nemo_rl/models/policy/lm_policy.py b/nemo_rl/models/policy/lm_policy.py index 19b5e217b5..78fbec391e 100644 --- a/nemo_rl/models/policy/lm_policy.py +++ b/nemo_rl/models/policy/lm_policy.py @@ -86,6 +86,8 @@ def __init__( tp_size = config["dtensor_cfg"]["tensor_parallel_size"] cp_size = config["dtensor_cfg"]["context_parallel_size"] + env_vars = config.get("env_vars", {}) + self.sharding_annotations = NamedSharding( layout=np.arange(cluster.world_size()).reshape( pp_size, # PP @@ -120,9 +122,7 @@ def __init__( name_prefix=name_prefix, workers_per_node=workers_per_node, sharding_annotations=self.sharding_annotations, - use_expandable_segments=config["megatron_cfg"].get( - "use_expandable_segments", None - ), + env_vars=env_vars, ) if config["dynamic_batching"]["enabled"]: From 3b4223921e0065bdddc934603d64e410ca7f6921 Mon Sep 17 00:00:00 2001 From: ashors1 Date: Thu, 10 Jul 2025 14:32:12 -0700 Subject: [PATCH 3/6] move env_vars into backend config Signed-off-by: ashors1 --- nemo_rl/models/policy/lm_policy.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nemo_rl/models/policy/lm_policy.py b/nemo_rl/models/policy/lm_policy.py index 78fbec391e..5e82b61d72 100644 --- a/nemo_rl/models/policy/lm_policy.py +++ b/nemo_rl/models/policy/lm_policy.py @@ -75,6 +75,8 @@ def __init__( tp_size = config["megatron_cfg"]["tensor_model_parallel_size"] pp_size = config["megatron_cfg"]["pipeline_model_parallel_size"] cp_size = config["megatron_cfg"]["context_parallel_size"] + + env_vars = config["megatron_cfg"].get("env_vars", {}) else: assert config["dtensor_cfg"]["enabled"], ( "Please either set policy.megatron_cfg.enabled=true to use Megatron training backend " @@ -86,7 +88,7 @@ def __init__( tp_size = config["dtensor_cfg"]["tensor_parallel_size"] cp_size = config["dtensor_cfg"]["context_parallel_size"] - env_vars = config.get("env_vars", {}) + env_vars = config["dtensor_cfg"].get("env_vars", {}) self.sharding_annotations = NamedSharding( layout=np.arange(cluster.world_size()).reshape( From 454e76e624161344ff178da044d280f55dce07f6 Mon Sep 17 00:00:00 2001 From: ashors1 Date: Thu, 10 Jul 2025 16:08:46 -0700 Subject: [PATCH 4/6] add unit tests Signed-off-by: ashors1 --- nemo_rl/distributed/worker_groups.py | 4 +- tests/unit/distributed/test_worker_groups.py | 86 ++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/nemo_rl/distributed/worker_groups.py b/nemo_rl/distributed/worker_groups.py index 6ef5a24d10..b625be25dc 100644 --- a/nemo_rl/distributed/worker_groups.py +++ b/nemo_rl/distributed/worker_groups.py @@ -426,7 +426,9 @@ def _create_workers_from_bundle_indices( ) # Update env_vars with the current environment variables - env_vars.update(dict(os.environ)) + for k, v in os.environ.items(): + if k not in env_vars: + env_vars[k] = v # Get the python environment for the actor actor_python_env = get_actor_python_env( diff --git a/tests/unit/distributed/test_worker_groups.py b/tests/unit/distributed/test_worker_groups.py index 12131fe4a4..0ec7bb714c 100644 --- a/tests/unit/distributed/test_worker_groups.py +++ b/tests/unit/distributed/test_worker_groups.py @@ -301,6 +301,92 @@ def test_environment_variables_setup(register_test_actor, virtual_cluster): worker_group.shutdown(force=True) +def test_custom_environment_variables(register_test_actor, virtual_cluster): + """Test that custom environment variables passed through env_vars are correctly set in workers.""" + actor_fqn = register_test_actor + builder = RayWorkerBuilder(actor_fqn) + + # Define custom environment variables to pass to workers + custom_env_vars = { + "CUSTOM_VAR_1": "test_value_1", + "CUSTOM_VAR_2": "test_value_2", + "NEMO_TEST_ENV": "nemo_test_value", + "PYTHONPATH": "/custom/python/path", + } + + # Create worker group with custom environment variables + worker_group = RayWorkerGroup( + cluster=virtual_cluster, + remote_worker_builder=builder, + workers_per_node=2, + env_vars=custom_env_vars.copy(), + ) + + assert len(worker_group.workers) == 2 + + # Check that all workers have the custom environment variables set + for i, worker in enumerate(worker_group.workers): + # Check each custom environment variable + for var_name, expected_value in custom_env_vars.items(): + actual_value = ray.get(worker.get_env_var.remote(var_name)) + assert actual_value == expected_value, ( + f"Worker {i}: Expected {var_name}={expected_value}, got {actual_value}" + ) + + # Also verify that the standard distributed environment variables are still set + rank, ws, node_rank, local_rank = ray.get( + worker.get_rank_world_size_node_rank_local_rank.remote() + ) + assert rank == str(i) + assert ws == "2" + assert node_rank == "0" + assert local_rank == str(i) + + worker_group.shutdown(force=True) + + +def test_custom_environment_variables_override_existing( + register_test_actor, virtual_cluster +): + """Test that custom environment variables can override existing environment variables.""" + actor_fqn = register_test_actor + builder = RayWorkerBuilder(actor_fqn) + + # Set an environment variable in the current process + os.environ["DUMMY_PYTHONPATH"] = "/original/python/path" + + # Define custom environment variables that override existing ones + custom_env_vars = { + "DUMMY_PYTHONPATH": "/overridden/python/path", + "CUSTOM_OVERRIDE": "overridden_value", + } + + # Create worker group with custom environment variables + worker_group = RayWorkerGroup( + cluster=virtual_cluster, + remote_worker_builder=builder, + workers_per_node=1, + env_vars=custom_env_vars, + ) + + assert len(worker_group.workers) == 1 + worker = worker_group.workers[0] + + # Check that the custom environment variable overrides the original + pythonpath_value = ray.get(worker.get_env_var.remote("DUMMY_PYTHONPATH")) + assert pythonpath_value == "/overridden/python/path", ( + f"Expected DUMMY_PYTHONPATH to be overridden, got {pythonpath_value}" + ) + + # Check that the new custom variable is set + custom_value = ray.get(worker.get_env_var.remote("CUSTOM_OVERRIDE")) + assert custom_value == "overridden_value", ( + f"Expected CUSTOM_OVERRIDE=overridden_value, got {custom_value}" + ) + + worker_group.shutdown(force=True) + + def test_configure_worker_interaction(register_test_actor, virtual_cluster): actor_fqn = register_test_actor builder = RayWorkerBuilder(actor_fqn) From 2d82d9b026fe61265ff50a36cfcdf6d0736c090b Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Thu, 10 Jul 2025 19:31:57 -0700 Subject: [PATCH 5/6] Fix megatron llama3.1-8b config Signed-off-by: Yi-Fu Wu --- examples/configs/grpo_math_8B_megatron.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/configs/grpo_math_8B_megatron.yaml b/examples/configs/grpo_math_8B_megatron.yaml index e3e6247f45..35306bef55 100644 --- a/examples/configs/grpo_math_8B_megatron.yaml +++ b/examples/configs/grpo_math_8B_megatron.yaml @@ -38,6 +38,8 @@ policy: pipeline_model_parallel_size: 1 context_parallel_size: 1 pipeline_dtype: ${policy.precision} + env_vars: + PYTORCH_CUDA_ALLOC_CONF: "expandable_segments:True" optimizer: optimizer: "adam" @@ -67,7 +69,7 @@ policy: stop_strings: null vllm_cfg: tensor_parallel_size: 1 - gpu_memory_utilization: 0.95 + gpu_memory_utilization: 0.8 max_model_len: ${policy.max_total_sequence_length} cluster: From 0240de94a425dfea5ab9134e4391ae1c89cf62b4 Mon Sep 17 00:00:00 2001 From: Yi-Fu Wu Date: Fri, 11 Jul 2025 10:41:40 -0700 Subject: [PATCH 6/6] Remove unused config and set pp=2 for default 8b config This should run without OOM on A100 / H100 Signed-off-by: Yi-Fu Wu --- examples/configs/grpo_math_1B_megatron.yaml | 1 - examples/configs/grpo_math_70B_megatron.yaml | 1 - examples/configs/grpo_math_8B_megatron.yaml | 6 ++---- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/examples/configs/grpo_math_1B_megatron.yaml b/examples/configs/grpo_math_1B_megatron.yaml index e6dbc8f18e..7a8a651a54 100644 --- a/examples/configs/grpo_math_1B_megatron.yaml +++ b/examples/configs/grpo_math_1B_megatron.yaml @@ -40,7 +40,6 @@ policy: logprob_batch_size: 8 max_total_sequence_length: 512 precision: "bfloat16" - refit_buffer_size_gb: 4 # used for refitting inference engine, the unit is GB dtensor_cfg: enabled: false diff --git a/examples/configs/grpo_math_70B_megatron.yaml b/examples/configs/grpo_math_70B_megatron.yaml index 15a65c5ce6..a7ba2c8a52 100644 --- a/examples/configs/grpo_math_70B_megatron.yaml +++ b/examples/configs/grpo_math_70B_megatron.yaml @@ -17,7 +17,6 @@ policy: precision: "bfloat16" fsdp_offload_enabled: false activation_checkpointing_enabled: false - refit_buffer_size_gb: 4 # used for refitting inference engine, the unit is GB dtensor_cfg: enabled: false diff --git a/examples/configs/grpo_math_8B_megatron.yaml b/examples/configs/grpo_math_8B_megatron.yaml index 35306bef55..fc839c8239 100644 --- a/examples/configs/grpo_math_8B_megatron.yaml +++ b/examples/configs/grpo_math_8B_megatron.yaml @@ -21,7 +21,6 @@ policy: precision: "bfloat16" fsdp_offload_enabled: false activation_checkpointing_enabled: false - refit_buffer_size_gb: 4 # used for refitting inference engine, the unit is GB dtensor_cfg: enabled: false @@ -35,11 +34,10 @@ policy: empty_unused_memory_level: 0 converter_type: "LlamaForCausalLM" tensor_model_parallel_size: 1 - pipeline_model_parallel_size: 1 + # On H100, can run with pp=1 for better performance with expandable segments (which is enabled by default) + pipeline_model_parallel_size: 2 context_parallel_size: 1 pipeline_dtype: ${policy.precision} - env_vars: - PYTORCH_CUDA_ALLOC_CONF: "expandable_segments:True" optimizer: optimizer: "adam"