fix(infra): gate RTensor.remotize on DP-head to plug _storage leak#1297
fix(infra): gate RTensor.remotize on DP-head to plug _storage leak#1297garrett4wade merged 3 commits intoinclusionAI:mainfrom
Conversation
Skip RTensor.remotize on non-DP-head TrainEngine ranks and return a None sentinel; their results are discarded by the controller's _collect_results filter anyway, so storing them in node-local _storage just leaks. DP-head ranks and non-TrainEngine workers (e.g. inference backends) continue remotizing as before. Closes inclusionAI#1296.
There was a problem hiding this comment.
Code Review
This pull request implements conditional tensor remotization to prevent memory leaks on non-data-parallel (DP) head ranks. By restricting RTensor.remotize to DP heads or non-training engines, the change avoids accumulating unused tensors in local storage. Feedback indicates that the current logic incorrectly skips remotization when the process group is not yet initialized, which could cause issues during early execution phases. A suggestion was made to refine the conditional logic to be more conservative during initialization to ensure valid return values from all ranks.
| is_dp_head = ( | ||
| isinstance(engine, TrainEngine) | ||
| and getattr(engine, "process_group_initialized", False) | ||
| and engine.is_data_parallel_head() | ||
| ) | ||
| if is_dp_head or not isinstance(engine, TrainEngine): | ||
| state = get_state() | ||
| result = RTensor.remotize(result, node_addr=state.node_addr) | ||
| serialized_result = serialize_value(result) | ||
| else: | ||
| # Non-DP-head: result is discarded by controller. Skip remotize | ||
| # (no _storage growth) and return a sentinel. | ||
| serialized_result = serialize_value(None) |
There was a problem hiding this comment.
The current logic for gating remotize is inverted for the case where the engine is not yet initialized. According to the PR description, the gate should be conservative: if the process group is not initialized, we should fall back to the original behavior (remotize).
In the current implementation, if process_group_initialized is False, the code skips remotization and returns None. This could break initialization calls or early status checks that expect a valid return value from all ranks. The suggested change ensures we only skip remotization for initialized non-DP heads.
| is_dp_head = ( | |
| isinstance(engine, TrainEngine) | |
| and getattr(engine, "process_group_initialized", False) | |
| and engine.is_data_parallel_head() | |
| ) | |
| if is_dp_head or not isinstance(engine, TrainEngine): | |
| state = get_state() | |
| result = RTensor.remotize(result, node_addr=state.node_addr) | |
| serialized_result = serialize_value(result) | |
| else: | |
| # Non-DP-head: result is discarded by controller. Skip remotize | |
| # (no _storage growth) and return a sentinel. | |
| serialized_result = serialize_value(None) | |
| is_train = isinstance(engine, TrainEngine) | |
| is_init = getattr(engine, "process_group_initialized", False) | |
| if not is_train or not is_init or engine.is_data_parallel_head(): | |
| state = get_state() | |
| result = RTensor.remotize(result, node_addr=state.node_addr) | |
| serialized_result = serialize_value(result) | |
| else: | |
| # Non-DP-head: result is discarded by controller. Skip remotize | |
| # (no _storage growth) and return a sentinel. | |
| serialized_result = serialize_value(None) |
There was a problem hiding this comment.
Good catch gemini :) applied in the new commit. The previous condition over-reached by folding the process_group_initialized check into the skip gate, which meant pre-init RPCs (e.g. setup-time get_version / connect_engine) on non-DP-head TrainEngine ranks were also returning None sentinels. The new gate only skips when we're certain the result will be discarded — is_train AND is_init AND NOT is_data_parallel_head() — and falls back to the original remotize path everywhere else.
…RPCs The previous gate skipped remotize whenever process_group_initialized was False, which over-reaches: setup-time RPCs (e.g. get_version, connect_engine) need a real return on every rank, not a None sentinel. Restrict the skip to the case the leak fix actually targets — initialized TrainEngine on a non-DP-head rank — and let every other case (non-train engine, pre-init train engine, DP-head) take the original remotize path unchanged. Per gemini-code-assist review on inclusionAI#1297.
The previous condition read `getattr(engine, "process_group_initialized", False)`, but that attribute is set only by MegatronEngine. FSDPEngine and ArchonEngine track init state under `self._initialized` and expose it via the unified `initialized` property — which MegatronEngine also exposes. The previous check therefore evaluated False on every FSDP/Archon RPC, silently leaving the leak unfixed for those backends. Switch to the unified property so the gate activates on all three TrainEngine subclasses.
…RPCs The previous gate skipped remotize whenever process_group_initialized was False, which over-reaches: setup-time RPCs (e.g. get_version, connect_engine) need a real return on every rank, not a None sentinel. Restrict the skip to the case the leak fix actually targets — initialized TrainEngine on a non-DP-head rank — and let every other case (non-train engine, pre-init train engine, DP-head) take the original remotize path unchanged. Per gemini-code-assist review on inclusionAI#1297.
Description
Skip
RTensor.remotizeon non-DP-headTrainEngineranks and return aNonesentinel; their results are discarded by the controller's_collect_resultsfilter anyway, so storing them in node-local_storagejust leaks. DP-head ranks and non-TrainEngineworkers (e.g. inference backends) continue remotizing as before.Related Issue
Fixes #1296
Type of Change
Checklist
pre-commit run --all-files)./docs/build_all.sh)main/review-prcommand/create-prBreaking Change Details (if applicable):
N/A
Additional Context
See #1296 for the full diagnosis (per-step leak rates, MEMDIAG numbers, architectural mirror of #1209/#1282).
The gate is conservative — three conditions must all hold to skip remotize:
engineis aTrainEngine(inference backends are unaffected),is_data_parallel_head()would crash),Otherwise the original remotize path runs unchanged.
Validated on a Qwen2.5-VL-32B
megatron:d4p2t4run that previously leaked ~411 MB / step per non-DP-head rank (73 GB at step 168). Post-fix RSS is flat across the same workload.