Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions QEfficient/cloud/finetune_experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from QEfficient.finetune.experimental.core.model import HFModel # noqa: F401
from QEfficient.finetune.experimental.core.optimizer import prepare_optimizer
from QEfficient.finetune.experimental.core.trainer import sft_trainer # noqa: F401
from QEfficient.finetune.experimental.core.utils.device_map_utils import get_device_map
from QEfficient.finetune.experimental.core.utils.peft_utils import convert_peft_config_to_lora_config
from QEfficient.finetune.experimental.core.utils.training_config_utils import prepare_training_config

Expand Down Expand Up @@ -111,6 +112,22 @@ def _create_model(self) -> Any:
model_type = model_config.pop("model_type")
model_name = model_config.pop("model_name")

# Get training config for PP settings
training_config = self.config.training
pp_degree = training_config.get("pp_degree", 1)
device = training_config.get("device", "qaic")

# Generate device_map for pipeline parallelism if pp_degree > 1
if pp_degree > 1:
device_map = get_device_map(
model_name=model_name,
device=device,
pp_degree=pp_degree,
)
# Pass device_map via model_config kwargs for model loading
model_config["device_map"] = device_map
logger.log_rank_zero(f"Pipeline Parallelism enabled: Using device_map for {pp_degree} stages")

# Filter out PEFT-related fields, these shouldn't be passed to model creation
excluded_keys = {"use_peft", "peft_config"}
model_config_kwargs = {k: v for k, v in model_config.items() if k not in excluded_keys}
Expand Down Expand Up @@ -194,6 +211,8 @@ def _create_trainer(
# Note: torch_dtype was already converted to fp16/bf16 flag in prepare_training_config
training_config.pop("deepspeed_config", None)
training_config.pop("torch_dtype", None)
# Remove PP-specific fields as they're handled via device_map in model loading
training_config.pop("pp_degree", None)

# Create trainer arguments instance
args = args_cls(**training_config)
Expand Down
109 changes: 109 additions & 0 deletions QEfficient/finetune/experimental/configs/sample_pp_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# -----------------------------------------------------------------------------
#
# Copyright (c) Qualcomm Technologies, Inc. and/or its subsidiaries.
# SPDX-License-Identifier: BSD-3-Clause
#
# -----------------------------------------------------------------------------
#
# Sample configuration for Pipeline Parallelism (PP) without DDP
# This config demonstrates how to enable PP support on a single node without distributed training
#
# To run with PP only (no DDP):
# python -m QEfficient.cloud.finetune_experimental configs/sample_pp_config.yaml
#

# To Do: Since config is not getting updated properly thorugh yaml, it gets over written (fix for this is added in #795).
# Once #795 is merged, redudant params (params fow which value matches value in config_manager) can be removed from here.
# Dataset can also be kept in sync with

# Model configuration
model:
model_type: "hf" # Hugging Face model
auto_class_name: "AutoModelForCausalLM"
model_name: "meta-llama/Llama-3.2-1B" # Pretrained model name
use_cache: False
attn_implementation: "sdpa"
use_peft: True
peft_config:
lora_r: 8
lora_alpha: 16
lora_dropout: 0.1
target_modules: ["q_proj", "v_proj"]
task_type: "CAUSAL_LM"
peft_type: "LORA"
bias: "none" # Options: "none", "all", "lora_only"

# Dataset configuration
dataset:
tokenizer_name: "meta-llama/Llama-3.2-1B"
dataset_type: "sft_dataset"
dataset_name: "openai/gsm8k"
prompt_template: "Solve the following math problem step by step.\n\n### Question:\n{question}\n\n### Answer:\n"
config_name: "main"
train_split: "train"
test_split: "test"
max_seq_length: 512
completion_template: "{answer}"
dataloader_num_workers: 1
dataloader_pin_memory: True
dataloader_persistent_workers: False
group_by_length: True
# Training configuration
training:
type: "sft"
output_dir: "./training_results_pp"
overwrite_output_dir: false
seed: 42
device: "qaic" # Use 'cuda' for NVIDIA GPUs, 'qaic' for Qualcomm Cloud AI
do_eval: True
torch_dtype: "fp16"
eval_strategy: "epoch"
eval_steps: 100
per_device_train_batch_size: 1
per_device_eval_batch_size: 1
gradient_accumulation_steps: 4
num_train_epochs: 5
max_steps: -1
log_level: "info"
log_on_each_node: True
logging_strategy: "steps"
logging_steps: 10
save_strategy: "epoch"
save_steps: 100
save_total_limit: 5
metric_for_best_model: "eval_loss"
completion_only_loss: True

# Pipeline Parallelism Configuration (PP without DDP)
Comment thread
quic-swatia marked this conversation as resolved.
enable_pp: True
num_pp_stages: 2 # Split the model into 2 pipeline stages

# Gradient Checkpointing (optional, saves memory)
gradient_checkpointing: False
gradient_checkpointing_kwargs:
preserve_rng_state: True
use_reentrant: False

torch_compile: false
include_num_input_tokens_seen: True
average_tokens_across_devices: True

# Optimizer configuration
optimizers:
optimizer_name: "AdamW"
lr: 5e-5
weight_decay: 0.01

# Scheduler configuration
scheduler:
scheduler_name: "cosine"
warmup_steps: 100

# Callbacks
callbacks:
early_stopping:
early_stopping_patience: 3
early_stopping_threshold: 0.001
tensorboard: {}


14 changes: 13 additions & 1 deletion QEfficient/finetune/experimental/core/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class OptimizerConfig:
"""Configuration for optimizers."""

optimizer_name: str = field(
default="adamw",
default="AdamW",
Comment thread
quic-swatia marked this conversation as resolved.
metadata={"help": "The name of the optimizer to use."},
)
lr: float = field(
Expand Down Expand Up @@ -455,6 +455,10 @@ class TrainingConfig:
default=False,
metadata={"help": "Whether to compute loss only on completion tokens."},
)
pp_degree: int = field(
default=1,
metadata={"help": "Pipeline parallelism degree (number of pipeline stages). Set > 1 to enable PP."},
)


@dataclass
Expand Down Expand Up @@ -744,6 +748,14 @@ def validate_config(self) -> None:
self._push(errors, training.get("logging_steps", 0) < 0, "training.logging_steps must be >= 0.")
self._push(errors, training.get("save_total_limit", 0) < 0, "training.save_total_limit must be >= 0.")

# Pipeline Parallelism (PP) config
pp_degree = training.get("pp_degree", 1)
self._push(
errors,
not isinstance(pp_degree, int) or pp_degree < 1,
"training.pp_degree must be a positive integer (default 1 = no PP; > 1 enables PP).",
)

# DDP config
ddp = training.get("ddp_config", {})
if isinstance(ddp, dict):
Expand Down
169 changes: 169 additions & 0 deletions QEfficient/finetune/experimental/core/utils/device_map_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# -----------------------------------------------------------------------------
#
# Copyright (c) Qualcomm Technologies, Inc. and/or its subsidiaries.
# SPDX-License-Identifier: BSD-3-Clause
#
# -----------------------------------------------------------------------------

"""
Utility functions for creating device maps for pipeline parallelism.
"""

from typing import Dict, Optional

import numpy as np
import torch
from transformers import AutoConfig

from QEfficient.finetune.experimental.core.utils.dist_utils import get_local_rank
from QEfficient.utils._utils import get_num_layers_from_config


def get_device_map(
model_name: str,
device: str,
pp_degree: int = 1,
) -> Optional[Dict[str, int]]:
"""
Returns device map for the given model based on PP and DDP configuration.

Args:
model_name: Name of the model to load configuration from.
device: Device type (e.g., 'cuda', 'qaic').
pp_degree: Pipeline parallelism degree (number of pipeline stages). > 1 enables PP.
Returns:
Dict: A dictionary mapping layer names to device IDs, or None if no PP.
"""
if pp_degree <= 1:
return None

torch_device = torch.device(device)
num_available_devices = getattr(torch, torch_device.type).device_count()

if pp_degree > num_available_devices:
raise ValueError(
f"pp_degree ({pp_degree}) cannot exceed the number of available {device} devices "
f"({num_available_devices}). Reduce pp_degree or use a node with more devices."
)
elif pp_degree == num_available_devices:
device_map = "auto"
else: # pp_degree < num_available_devices
device_map = custom_device_map(model_name, device, pp_degree)

return device_map


def custom_device_map(model_name: str, device: str, pp_degree: int) -> Dict[str, int]:
"""
Returns custom device map for model layers based on number of pipeline stages and process rank.

Args:
model_name: Name of the model to load configuration from.
device: Device type (e.g., 'cuda', 'qaic').
pp_degree: Pipeline parallelism degree (number of pipeline stages).

Returns:
Dict: A dictionary mapping layer names to device IDs.

Notes:
- This device map structure is verified for llama models primarily.
- For other architectures, you may need to adjust the layer naming conventions.
- Layers are distributed as evenly as possible: the first (num_layers % pp_degree)
stages receive one extra layer each.

Example:
Example config for PP + DDP is provided below as it works for only PP as well.
Configuration for meta-llama/Llama-3.2-1B
Total devices: 4 (2x PP x 2x DDP)

PP (Pipeline Parallelism): Each copy of the model is split into 2 stages
DDP (Distributed Data Parallel): 2 model copies run in parallel

|--------------------------------------------------------------------------|
| Process Rank | Assigned Device IDs | Model Component |
|--------------------------------------------------------------------------|
| Rank 0 | 0 | model.embed_tokens |
| | | model.lm_head |
| | | model.layers.0 - model.layers.7 |
|--------------------------------------------------------------------------|
| Rank 0 | 1 | model.norm |
| | | model.rotary_emb |
| | | model.layers.8 - model.layers.15 |
|--------------------------------------------------------------------------|
| Rank 1 | 2 | model.embed_tokens |
| | | model.lm_head |
| | | model.layers.0 - model.layers.7 |
|--------------------------------------------------------------------------|
| Rank 1 | 3 | model.norm |
| | | model.rotary_emb |
| | | model.layers.8 - model.layers.15 |
|--------------------------------------------------------------------------|
"""

model_config = AutoConfig.from_pretrained(model_name)
num_layers = get_num_layers_from_config(model_config)
local_rank = get_local_rank()

if num_layers < pp_degree:
raise ValueError(
f"Number of model layers ({num_layers}) must be >= pp_degree ({pp_degree}). "
f"Cannot split {num_layers} layers across {pp_degree} pipeline stages."
)

first_device = local_rank * pp_degree
last_device = local_rank * pp_degree + (pp_degree - 1)

# Handle tied embeddings
if model_config.tie_word_embeddings:
lm_head_device = first_device
else:
lm_head_device = last_device

device_map = {
"model.embed_tokens": first_device,
"lm_head": lm_head_device,
"model.norm": last_device,
"model.rotary_emb": last_device,
}

# Distribute layers as evenly as possible across stages.
# The first (num_layers % pp_degree) stages get one extra layer each.
base_layers, remainder = divmod(num_layers, pp_degree)
layers_per_stage = np.array([base_layers + (1 if i < remainder else 0) for i in range(pp_degree)])

# Create device assignment per layer
pp_device_map = np.repeat(np.arange(pp_degree), layers_per_stage)

# Assign each layer to a device
for i in range(num_layers):
device_map[f"model.layers.{i}"] = int(pp_device_map[i] + local_rank * pp_degree)

return device_map


def validate_pp_config(
pp_degree: int,
device: str,
local_world_size: int = 1,
) -> None:
"""
Validate pipeline parallelism configuration.

Args:
pp_degree: Pipeline parallelism degree (number of pipeline stages). Must be > 1 to enable PP.
device: Device type (e.g., 'cuda', 'qaic').
local_world_size: Number of processes per node for DDP.

Raises:
AssertionError: If configuration is invalid.
"""
Comment thread
quic-swatia marked this conversation as resolved.
if pp_degree > 1:
# Validate device availability
torch_device = torch.device(device)
num_available_devices = getattr(torch, torch_device.type).device_count()

assert local_world_size * pp_degree <= num_available_devices, (
f"Number of devices required per node (LOCAL_WORLD_SIZE * pp_degree = "
f"{local_world_size} * {pp_degree} = {local_world_size * pp_degree}) "
f"should be <= locally available devices ({num_available_devices})."
)
Loading