Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions colossalai/checkpoint_io/hybrid_parallel_checkpoint_io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
from functools import reduce
import logging
import os
from pathlib import Path
Expand Down Expand Up @@ -313,9 +314,13 @@ def load_sharded_model(self, model: ModelWrapper, checkpoint_index_file: Path, s
# Keep a record of loaded files so that file will not be repeatedly loaded.
loaded_file = set()

missing_keys = []
missing_file_keys = []

def _load(name: str):
if name not in weight_map:
raise ValueError(f"{name} is not stored in checkpoint, please check your checkpointing configuration!")
missing_file_keys.append(name)
return
filename = weight_map[name]

# If this param/buffer has been loaded before, directly return.
Expand All @@ -324,7 +329,6 @@ def _load(name: str):

file_path = os.path.join(ckpt_root_path, filename)
state_dict = load_shard_state_dict(Path(file_path), use_safetensors)
missing_keys = []

load_state_dict_into_model(
model, state_dict, missing_keys=missing_keys, strict=strict, load_sub_module=True
Expand Down Expand Up @@ -357,6 +361,27 @@ def _load(name: str):
if self.verbose and self.coordinator.is_master():
logging.info(f"The model has been successfully loaded from sharded checkpoint: {ckpt_root_path}.")

if len(missing_keys) == 0:
raise RuntimeError(
"No weigth is loaded into the model. Please check the checkpoint files and the model structure."
)

remain_keys = reduce(lambda a, b: a & b, map(set, missing_keys))
remain_keys = remain_keys.union(set(missing_file_keys))
if len(remain_keys) > 0:
if strict:
error_msgs = "Missing key(s) in state_dict: {}. ".format(
", ".join('"{}"'.format(k) for k in missing_keys)
)
raise RuntimeError(
"Error(s) in loading state_dict for {}:\n\t{}".format(
self.__class__.__name__, "\n\t".join(error_msgs)
)
)
else:
if self.coordinator.is_master():
logging.info(f"The following keys are not loaded from checkpoint: {remain_keys}")

def save_sharded_optimizer(
self,
optimizer: OptimizerWrapper,
Expand Down
Loading