From 2e7f28ea45bbfec1adb43cb63550984bce7ba4de Mon Sep 17 00:00:00 2001 From: Yunnglin Date: Mon, 13 Apr 2026 17:22:53 +0800 Subject: [PATCH 1/3] fix tensor collect --- Dockerfile | 2 +- src/twinkle/server/model/backends/common.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index ee04ae68..e29e4d17 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,7 +37,7 @@ RUN pip install flash-linear-attention -U --no-cache-dir RUN pip install numpy==2.2 --no-cache-dir # Install tinker, ray, and other deps -RUN pip install --no-cache-dir tinker==0.14.0 "ray[serve]" transformers peft accelerate -U +RUN pip install --no-cache-dir tinker==0.16.1 "ray[serve]" transformers peft accelerate -U # Clone and install twinkle, checkout to latest v-tag RUN git clone https://github.com/modelscope/twinkle.git diff --git a/src/twinkle/server/model/backends/common.py b/src/twinkle/server/model/backends/common.py index 2cc1e091..607794c3 100644 --- a/src/twinkle/server/model/backends/common.py +++ b/src/twinkle/server/model/backends/common.py @@ -164,12 +164,11 @@ def _ensure_dpo_metric(self, adapter_name: str, beta: float): def _tinker_build_output(self, inputs, outputs): """Extract logits/logps from model outputs and build per-datum output list.""" - logits = outputs.get('logits') - if logits is not None: - logits = self._normalize_tensor_output(logits) - logps = outputs.get('logps', None) - if logps is not None: - logps = self._normalize_tensor_output(logps) + logits = self._normalize_tensor_output(outputs.get('logits')) + logps = self._normalize_tensor_output(outputs.get('logps')) + if logits is None and logps is None: + # non-last PP stage: no outputs produced, collector will discard this + return [] return self._get_forward_output(inputs, logits, logps) @staticmethod @@ -177,9 +176,9 @@ def _normalize_tensor_output(value): """Normalize various output formats (tensor, list of tensors, nested lists, floats) to a single tensor. Handles: + - None or empty list: returns None - torch.Tensor: detach and move to cpu - list of torch.Tensor: cat along dim=0 - - nested lists: recursively flatten and cat - list of floats/int: convert to tensor """ if value is None: @@ -189,6 +188,10 @@ def _normalize_tensor_output(value): return value.detach().cpu() if isinstance(value, list): + if not value: # empty list (e.g. non-last PP stage): treat as missing + return None + if isinstance(value[0], torch.Tensor): + return torch.cat(value, dim=0).detach().cpu() return torch.as_tensor(value, dtype=torch.float32).detach().cpu() if isinstance(value, (int, float)): From ae4b13668fab1d72da927ca6ef2bd365d48e5a37 Mon Sep 17 00:00:00 2001 From: Yunnglin Date: Wed, 15 Apr 2026 19:29:41 +0800 Subject: [PATCH 2/3] update doc --- cookbook/client/tinker/modelscope/dpo.py | 207 ++++++++++++++++++ .../tinker/modelscope/self_cognition.py | 2 +- .../client/tinker/self_host/self_cognition.py | 2 +- cookbook/client/twinkle/modelscope/dpo.py | 204 +++++++++++++++++ .../Usage Guide/Server and Client/Overview.md | 15 +- .../Tinker-Compatible-Client.md | 184 +++++----------- .../Server and Client/Twinkle-Client.md | 94 ++++---- .../Usage Guide/Train-as-a-Service.md | 2 +- ...71\345\256\242\346\210\267\347\253\257.md" | 184 +++++----------- ...le\345\256\242\346\210\267\347\253\257.md" | 96 ++++---- .../\346\246\202\350\277\260.md" | 15 +- ...55\347\273\203\346\234\215\345\212\241.md" | 2 +- 12 files changed, 650 insertions(+), 357 deletions(-) create mode 100644 cookbook/client/tinker/modelscope/dpo.py create mode 100644 cookbook/client/twinkle/modelscope/dpo.py diff --git a/cookbook/client/tinker/modelscope/dpo.py b/cookbook/client/tinker/modelscope/dpo.py new file mode 100644 index 00000000..f19000b1 --- /dev/null +++ b/cookbook/client/tinker/modelscope/dpo.py @@ -0,0 +1,207 @@ +# Tinker-Compatible Client - DPO (Direct Preference Optimization) Training with LoRA +# +# This script demonstrates how to fine-tune a language model using DPO +# through the Tinker-compatible client API. +# +# Training flow per step: +# 1. forward_backward with 'cross_entropy' + disable_lora=True +# → base-model forward pass; LoRA weights are NOT in the computation graph +# so backward accumulates zero LoRA gradients (safe to discard). +# 2. Attach returned per-token ref logps to each datum's loss_fn_inputs. +# 3. forward_backward with 'importance_sampling' +# → server detects ref_logps and switches to DPOLoss + DPOMetric. +# 4. optim_step → update LoRA, DPO metrics returned automatically. +# +# The server must be running first (see server.py and server_config.yaml). + +import os +import numpy as np +import torch +from tqdm import tqdm +from typing import Any, Dict, List + +import swanlab + +from tinker import types +from twinkle import init_tinker_client, get_logger +from twinkle.dataset import Dataset, DatasetMeta, LazyDataset +from twinkle.dataloader import DataLoader +from twinkle.preprocessor import EmojiDPOProcessor +from twinkle.server.common import input_feature_to_datum + +logger = get_logger() + +# Initialize the Tinker client before importing ServiceClient +init_tinker_client() + +from tinker import ServiceClient # noqa: E402 (must follow init_tinker_client) + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- +base_model = 'Qwen/Qwen3.6-35B-A3B' +base_url = 'http://www.modelscope.cn/twinkle' +api_key = 'EMPTY_API_KEY' +dataset_id = 'ms://hjh0119/shareAI-Llama3-DPO-zh-en-emoji' + +batch_size = 4 +learning_rate = 1e-4 +dpo_beta = 0.1 +sft_weight = 1.0 +max_length = 2048 +lora_rank = 8 +system_prompt = 'You are a helpful assistant.' +use_swanlab = True + + +# --------------------------------------------------------------------------- +# Dataset helpers (reused from twinkle/self_host/dpo.py) +# --------------------------------------------------------------------------- + +def create_dpo_dataset(): + """Create DPO dataset with positive/negative format.""" + dataset = LazyDataset(DatasetMeta(dataset_id, data_slice=range(5000))) + dataset.set_template('Qwen3_5Template', model_id=f'ms://{base_model}', max_length=max_length) + dataset.map( + EmojiDPOProcessor, + init_args={'system': system_prompt}, + ) + # EmojiDPOProcessor returns {'positive': InputFeature, 'negative': InputFeature, ...} + # encode handles this format automatically + dataset.encode() + return dataset + + +def prepare_dpo_batch(batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Reorganise batch into DP-safe interleaved format [pos_1, neg_1, pos_2, neg_2, ...]. + + Args: + batch: List of rows, each with 'positive' and 'negative' InputFeatures. + + Returns: + Interleaved list so each DP worker slice contains complete pairs. + """ + result = [] + for row in batch: + base_fields = {k: v for k, v in row.items() if k not in ('positive', 'negative')} + pos_sample = {**base_fields, **row['positive']} + neg_sample = {**base_fields, **row['negative']} + result.append(pos_sample) + result.append(neg_sample) + return result + + +# --------------------------------------------------------------------------- +# Training +# --------------------------------------------------------------------------- + +def train(): + # Step 0: Initialize SwanLab if enabled + if use_swanlab: + swanlab.login(api_key=os.environ['SWANLAB_API_KEY']) + swanlab.init( + project='twinkle-dpo', + experiment_name='dpo-lora-training', + config={ + 'base_model': base_model, + 'batch_size': batch_size, + 'learning_rate': learning_rate, + 'dpo_beta': dpo_beta, + 'sft_weight': sft_weight, + 'max_length': max_length, + 'lora_rank': lora_rank, + }, + ) + logger.info('SwanLab initialized') + + # Step 1: Prepare dataset & dataloader + logger.info('Loading DPO dataset...') + dataset = create_dpo_dataset() + dataloader = DataLoader(dataset=dataset, batch_size=batch_size) + logger.info(f'Dataset ready: {len(dataloader)} steps per epoch') + + # Step 2: Connect to server and create LoRA training client + service_client = ServiceClient(base_url=base_url, api_key=api_key) + training_client = service_client.create_lora_training_client( + base_model=base_model, + rank=lora_rank, + ) + logger.info(f'LoRA training client created (rank={lora_rank})') + logger.info(f'Starting DPO training: beta={dpo_beta}, lr={learning_rate}') + + # Step 3: Training loop + for step, batch in tqdm(enumerate(dataloader), total=len(dataloader)): + # Normalise numpy / torch tensors to plain Python lists for serialisation + for row in batch: + for key in list(row.keys()): + if isinstance(row[key], np.ndarray): + row[key] = row[key].tolist() + elif isinstance(row[key], torch.Tensor): + row[key] = row[key].cpu().numpy().tolist() + + # Build interleaved [pos, neg, pos, neg, ...] batch + dpo_batch = prepare_dpo_batch(batch) + + # Convert each InputFeature dict to a Tinker Datum + input_datums = [input_feature_to_datum(row) for row in dpo_batch] + + # ----------------------------------------------------------------- + # A. Reference forward pass (base model, disable_lora=True) + # LoRA weights are outside the computation graph → backward + # produces zero LoRA gradients, so this call is safe. + # ----------------------------------------------------------------- + ref_result = training_client.forward( + input_datums, + 'cross_entropy', + loss_fn_config={'disable_lora': True}, + ).result() + + # ----------------------------------------------------------------- + # B. Attach per-token ref logps to each datum's loss_fn_inputs + # ----------------------------------------------------------------- + for datum, ref_out in zip(input_datums, ref_result.loss_fn_outputs): + ref_logprobs_np = np.array(ref_out['logprobs'].tolist(), dtype=np.float32) + datum.loss_fn_inputs['ref_logps'] = types.TensorData.from_numpy(ref_logprobs_np) + + # ----------------------------------------------------------------- + # C. DPO forward_backward + # Server detects ref_logps → sets DPOLoss + DPOMetric automatically. + # Optional DPO hyper-params can be forwarded via loss_fn_config. + # (e.g. beta, sft_weight, not support dpo_loss_type for tinker) + # ----------------------------------------------------------------- + fwdbwd_result = training_client.forward_backward( + input_datums, + 'importance_sampling', + loss_fn_config={ + 'dpo_beta': dpo_beta, + 'dpo_sft_weight': sft_weight, + }, + ).result() + + # ----------------------------------------------------------------- + # D. Optimizer step — DPOMetric is calculated automatically on the + # server and returned inside optim_result.metrics. + # ----------------------------------------------------------------- + optim_result = training_client.optim_step( + types.AdamParams(learning_rate=learning_rate) + ).result() + + logger.info(f'[Step {step}] metrics={optim_result.metrics}') + + # Log metrics to SwanLab + if use_swanlab and optim_result.metrics: + swanlab.log(optim_result.metrics, step=step) + + # Step 4: Save checkpoint + save_result = training_client.save_state('dpo-lora-final').result() + logger.info(f'Saved checkpoint: {save_result.path}') + + # Step 5: (Optional) Upload to ModelScope Hub + # YOUR_USER_NAME = 'your_username' + # hub_model_id = f'{YOUR_USER_NAME}/twinkle-tinker-dpo-lora' + # training_client.publish_checkpoint_from_tinker_path(save_result.path).result() + # logger.info(f'Uploaded checkpoint to hub: {hub_model_id}') + + +if __name__ == '__main__': + train() diff --git a/cookbook/client/tinker/modelscope/self_cognition.py b/cookbook/client/tinker/modelscope/self_cognition.py index 2347c7fc..4f5e0d4f 100644 --- a/cookbook/client/tinker/modelscope/self_cognition.py +++ b/cookbook/client/tinker/modelscope/self_cognition.py @@ -107,7 +107,7 @@ def eval(): ] ) - input_feature = template.encode(trajectory, add_generation_prompt=True) + input_feature = template.batch_encode([trajectory], add_generation_prompt=True)[0] input_ids = input_feature['input_ids'].tolist() diff --git a/cookbook/client/tinker/self_host/self_cognition.py b/cookbook/client/tinker/self_host/self_cognition.py index 691662e6..6d33b6c8 100644 --- a/cookbook/client/tinker/self_host/self_cognition.py +++ b/cookbook/client/tinker/self_host/self_cognition.py @@ -109,7 +109,7 @@ def eval(): ] ) - input_feature = template.encode(trajectory, add_generation_prompt=True) + input_feature = template.batch_encode([trajectory], add_generation_prompt=True)[0] input_ids = input_feature['input_ids'].tolist() diff --git a/cookbook/client/twinkle/modelscope/dpo.py b/cookbook/client/twinkle/modelscope/dpo.py new file mode 100644 index 00000000..17a69965 --- /dev/null +++ b/cookbook/client/twinkle/modelscope/dpo.py @@ -0,0 +1,204 @@ +# Twinkle Client - DPO (Direct Preference Optimization) Training with LoRA +# +# This script demonstrates how to fine-tune a language model using DPO +# through the Twinkle client-server architecture. +# The server must be running first (see server.py and server_config.yaml). + +# Step 1: Load environment variables from a .env file (e.g., API tokens) +import dotenv +import os +from typing import Any, Dict, List + +dotenv.load_dotenv('.env') +import numpy as np +import torch +from peft import LoraConfig + +from twinkle import get_logger +from twinkle.dataset import Dataset, DatasetMeta +from twinkle_client import init_twinkle_client +from twinkle.dataloader import DataLoader +from twinkle_client.model import MultiLoraTransformersModel +from twinkle.preprocessor import EmojiDPOProcessor + +logger = get_logger() + +# Configuration (direct values, not from env) +base_model = 'Qwen/Qwen3.6-35B-A3B' +base_url = 'http://www.modelscope.cn/twinkle' +dataset_id = 'ms://hjh0119/shareAI-Llama3-DPO-zh-en-emoji' + +batch_size = 4 +gradient_accumulation_steps = 2 +learning_rate = 1e-4 +dpo_beta = 0.1 +sft_weight = 1.0 +loss_type = 'sigmoid' +max_length = 2048 +adapter_name = 'default' +system_prompt = 'You are a helpful assistant.' + +# Step 2: Initialize the Twinkle client to communicate with the remote server. +# - base_url: the address of the running Twinkle server +# - api_key: authentication token (loaded from environment variable) +client = init_twinkle_client(base_url=base_url, api_key=os.environ.get('MODELSCOPE_TOKEN')) + +# Step 3: Query the server for existing training runs and their checkpoints. +# This is useful for resuming a previous training session. +runs = client.list_training_runs() + +resume_path = None +for run in runs: + logger.info(run.model_dump_json(indent=2)) + # List all saved checkpoints for this training run + checkpoints = client.list_checkpoints(run.training_run_id) + + for checkpoint in checkpoints: + logger.info(checkpoint.model_dump_json(indent=2)) + # Uncomment the line below to resume from a specific checkpoint: + # resume_path = checkpoint.twinkle_path + + +def create_dpo_dataset(): + """Create DPO dataset with positive/negative format.""" + dataset = Dataset(DatasetMeta(dataset_id, data_slice=range(100))) + dataset.set_template('Qwen3_5Template', model_id=f'ms://{base_model}', max_length=max_length) + dataset.map( + EmojiDPOProcessor, + init_args={ + 'system': system_prompt, + } + ) + # DPO preprocessor returns {'positive': [...], 'negative': [...]} + # batch_encode handles this format automatically + dataset.encode() + return dataset + + +def prepare_dpo_batch(batch: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Prepare DPO batch: reorganize batch for training with DP-safe interleaving. + + Args: + batch: List of rows, each with 'positive' and 'negative' InputFeatures + and other fields (question, etc.) + + Returns: + List interleaved as [pos_1, neg_1, pos_2, neg_2, ...] to ensure each DP + worker gets complete positive/negative pairs after slicing. + Each item contains all original fields plus the InputFeature fields. + """ + result = [] + + for row in batch: + # Get base fields (excluding positive/negative) + base_fields = {k: v for k, v in row.items() if k not in ('positive', 'negative')} + + # Positive sample: merge base fields with positive InputFeature + pos_sample = {**base_fields, **row['positive']} + # Negative sample: merge base fields with negative InputFeature + neg_sample = {**base_fields, **row['negative']} + + # Interleave: [pos, neg] per pair for DP-safe slicing + result.append(pos_sample) + result.append(neg_sample) + + return result + + +def train(): + # Step 4: Prepare the dataset + + # Load the DPO dataset from ModelScope + dataset = create_dpo_dataset() + + # Wrap the dataset into a DataLoader that yields batches + dataloader = DataLoader(dataset=dataset, batch_size=batch_size) + + # Step 5: Configure the model + + # Create a multi-LoRA Transformers model pointing to the base model on ModelScope + model = MultiLoraTransformersModel(model_id=f'ms://{base_model}') + + # Define LoRA configuration: apply low-rank adapters to all linear layers + lora_config = LoraConfig( + target_modules='all-linear', + r=8, + lora_alpha=32, + lora_dropout=0.05, + ) + + # Attach the LoRA adapter named 'default' to the model. + # gradient_accumulation_steps means gradients are accumulated over micro-batches + # before an optimizer step, effectively increasing the batch size. + model.add_adapter_to_model(adapter_name, lora_config, gradient_accumulation_steps=gradient_accumulation_steps) + + # Set the same chat template used during data preprocessing + model.set_template('Qwen3_5Template') + + # Set the input processor (pads sequences on the right side) + model.set_processor('InputProcessor', padding_side='right') + + # Use DPO loss for preference optimization + model.set_loss('DPOLoss', beta=dpo_beta, loss_type=loss_type, reference_free=False, sft_weight=sft_weight) + + # Add DPO metric for logging + model.add_metric('DPOMetric', beta=dpo_beta) + + # Use Adam optimizer with a learning rate of 1e-4 + model.set_optimizer('Adam', lr=learning_rate) + + # Step 6: Optionally resume from a previous checkpoint + if resume_path: + logger.info(f'Resuming training from {resume_path}') + model.load(resume_path, load_optimizer=True) + + # Step 7: Run the training loop + logger.info(model.get_train_configs().model_dump()) + + optim_step = 0 + max_steps = len(dataloader) + logger.info(f'Starting LoRA DPO training: loss_type={loss_type}, beta={dpo_beta}, lr={learning_rate}') + logger.info(f'Using base model (disable_lora=True) as reference model') + + for batch in dataloader: + # batch is List[Dict] with 'positive' and 'negative' keys + # Convert numpy/torch tensors to lists for serialization + for row in batch: + for key in row: + if isinstance(row[key], np.ndarray): + row[key] = row[key].tolist() + elif isinstance(row[key], torch.Tensor): + row[key] = row[key].cpu().numpy().tolist() + + dpo_batch = prepare_dpo_batch(batch) + + # Get reference outputs using base model (without LoRA adapter) + # disable_lora=True tells the model to skip LoRA and use base weights + ref_outputs = model.forward_only(inputs=dpo_batch, disable_lora=True) + model.forward_backward(inputs=dpo_batch, ref_outputs=ref_outputs.result) + model.clip_grad_and_step() + + optim_step += 1 + + # Logging + if optim_step % gradient_accumulation_steps == 0: + metrics = model.calculate_metric(is_training=True) + logger.info(f'[Step {optim_step // gradient_accumulation_steps}/{max_steps}] {metrics}') + + # Step 8: Save the trained checkpoint + twinkle_path = model.save(name='dpo-lora-final', save_optimizer=True) + logger.info(f'Saved checkpoint: {twinkle_path}') + + # Step 9: Upload the checkpoint to ModelScope Hub + # YOUR_USER_NAME = "your_username" + # hub_model_id = f'{YOUR_USER_NAME}/twinkle-dpo-lora' + # model.upload_to_hub( + # checkpoint_dir=twinkle_path, + # hub_model_id=hub_model_id, + # async_upload=False + # ) + # logger.info(f"Uploaded checkpoint to hub: {hub_model_id}") + + +if __name__ == '__main__': + train() diff --git a/docs/source_en/Usage Guide/Server and Client/Overview.md b/docs/source_en/Usage Guide/Server and Client/Overview.md index 133cedff..2f1bf2bf 100644 --- a/docs/source_en/Usage Guide/Server and Client/Overview.md +++ b/docs/source_en/Usage Guide/Server and Client/Overview.md @@ -49,26 +49,35 @@ Complete runnable examples are located in the `cookbook/client/` directory: cookbook/client/ ├── server/ # Server startup configuration │ ├── transformer/ # Transformers backend -│ │ ├── server.py # Startup script +│ │ ├── run.sh # Startup script +│ │ ├── server.py # Server entry point │ │ └── server_config.yaml # Configuration file │ └── megatron/ # Megatron backend +│ ├── run.sh │ ├── server.py │ ├── server_config.yaml │ └── server_config_4b.yaml ├── twinkle/ # Twinkle Client examples │ ├── self_host/ # Self-hosted Server -│ │ ├── grpo.py # GRPO training client +│ │ ├── dpo.py # DPO training client +│ │ ├── multi_modal.py # Multi-modal training client │ │ ├── sample.py # Inference sampling client -│ │ └── self_congnition.py # Self-cognition training client +│ │ ├── self_congnition.py # Self-cognition training client +│ │ └── short_math_grpo.py # GRPO math training client │ └── modelscope/ # ModelScope managed service +│ ├── dpo.py +│ ├── multi_modal.py │ └── self_congnition.py └── tinker/ # Tinker Client examples ├── self_host/ # Self-hosted Server + │ ├── dpo.py # DPO training client │ ├── lora.py # LoRA training client + │ ├── multi_modal.py # Multi-modal training client │ ├── sample.py # Inference sampling client │ ├── self_cognition.py # Self-cognition training client │ └── short_math_grpo.py # GRPO math training client └── modelscope/ # ModelScope managed service + ├── dpo.py ├── sample.py ├── self_cognition.py └── short_math_grpo.py diff --git a/docs/source_en/Usage Guide/Server and Client/Tinker-Compatible-Client.md b/docs/source_en/Usage Guide/Server and Client/Tinker-Compatible-Client.md index 77738bb7..a530174e 100644 --- a/docs/source_en/Usage Guide/Server and Client/Tinker-Compatible-Client.md +++ b/docs/source_en/Usage Guide/Server and Client/Tinker-Compatible-Client.md @@ -34,110 +34,11 @@ After initialization, simply import `from tinker import ServiceClient` to connec ## Complete Training Example -```python -import os -import numpy as np -import dotenv -dotenv.load_dotenv('.env') - -# Step 1: Initialize Tinker client before importing ServiceClient -from twinkle import init_tinker_client -init_tinker_client() - -from tinker import types, ServiceClient -from modelscope import AutoTokenizer - -service_client = ServiceClient( - base_url='http://localhost:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') # Recommended: set to ModelScope Token -) - -# Step 2: Query existing training runs (optional) -rest_client = service_client.create_rest_client() -response = rest_client.list_training_runs(limit=50).result() -print(f"Found {len(response.training_runs)} training runs") - -# Step 3: Create training client -base_model = "Qwen/Qwen3-4B" - -# Create new training session -training_client = service_client.create_lora_training_client( - base_model=base_model -) - -# Or resume from checkpoint -# resume_path = "twinkle://run_id/weights/checkpoint_name" -# training_client = service_client.create_training_client_from_state_with_optimizer(path=resume_path) - -# Step 4: Prepare training data -examples = [ - {"input": "banana split", "output": "anana-bay plit-say"}, - {"input": "quantum physics", "output": "uantum-qay ysics-phay"}, - {"input": "donut shop", "output": "onut-day op-shay"}, -] - -tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) - -def process_example(example: dict, tokenizer) -> types.Datum: - """Convert raw sample to Datum format required by Tinker API. - - Datum contains: - - model_input: Input token IDs - - loss_fn_inputs: Target tokens and per-token weights (0=ignore, 1=compute loss) - """ - prompt = f"English: {example['input']}\nPig Latin:" - - # Prompt part: weight=0, does not participate in loss computation - prompt_tokens = tokenizer.encode(prompt, add_special_tokens=True) - prompt_weights = [0] * len(prompt_tokens) - - # Completion part: weight=1, participates in loss computation - completion_tokens = tokenizer.encode(f" {example['output']}\n\n", add_special_tokens=False) - completion_weights = [1] * len(completion_tokens) - - # Concatenate and construct next-token prediction format - tokens = prompt_tokens + completion_tokens - weights = prompt_weights + completion_weights - - input_tokens = tokens[:-1] - target_tokens = tokens[1:] - weights = weights[1:] - - return types.Datum( - model_input=types.ModelInput.from_ints(tokens=input_tokens), - loss_fn_inputs=dict(weights=weights, target_tokens=target_tokens) - ) - -processed_examples = [process_example(ex, tokenizer) for ex in examples] - -# Step 5: Training loop -for epoch in range(2): - for batch in range(5): - # Send training data to Server: forward + backward propagation - fwdbwd_future = training_client.forward_backward(processed_examples, "cross_entropy") - # Optimizer update - optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) - - # Wait for results - fwdbwd_result = fwdbwd_future.result() - optim_result = optim_future.result() - - # Calculate weighted average log-loss - logprobs = np.concatenate([o['logprobs'].tolist() for o in fwdbwd_result.loss_fn_outputs]) - weights = np.concatenate([e.loss_fn_inputs['weights'].tolist() for e in processed_examples]) - print(f"Epoch {epoch}, Batch {batch}: Loss = {-np.dot(logprobs, weights) / weights.sum():.4f}") - - # Save checkpoint every epoch - save_result = training_client.save_state(f"lora-epoch-{epoch}").result() - print(f"Saved checkpoint to {save_result.path}") -``` - -## Using Twinkle Dataset Components - -Tinker compatible mode can also leverage Twinkle's dataset components to simplify data preparation instead of manually constructing `Datum`: +> **Note**: `DataLoader` and `Dataset` in Tinker compatible mode only support local `twinkle` imports; `twinkle_client` is not supported. ```python import os +import numpy as np from tqdm import tqdm from tinker import types from twinkle import init_tinker_client @@ -146,40 +47,55 @@ from twinkle.dataset import Dataset, DatasetMeta from twinkle.preprocessor import SelfCognitionProcessor from twinkle.server.common import input_feature_to_datum -# Initialize Tinker client before importing ServiceClient +# Step 1: Initialize Tinker client before importing ServiceClient init_tinker_client() from tinker import ServiceClient -base_model = "Qwen/Qwen3.5-4B" +base_model = 'Qwen/Qwen3.5-4B' +base_url = 'http://localhost:8000' +api_key = 'EMPTY_API_KEY' -# Use Twinkle's Dataset component to load and preprocess data +# Step 2: Prepare dataset dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(500))) dataset.set_template('Qwen3_5Template', model_id=f'ms://{base_model}', max_length=256) dataset.map(SelfCognitionProcessor('twinkle model', 'ModelScope Team'), load_from_cache_file=False) dataset.encode(batched=True, load_from_cache_file=False) dataloader = DataLoader(dataset=dataset, batch_size=8) -# Initialize client -service_client = ServiceClient( - base_url='http://localhost:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') # Recommended: set to ModelScope Token -) +# Step 3: Initialize training client +service_client = ServiceClient(base_url=base_url, api_key=api_key) + +# Create LoRA training client (rank=16 specifies the LoRA adapter rank) training_client = service_client.create_lora_training_client(base_model=base_model, rank=16) -# Training loop: Use input_feature_to_datum to convert data format +# Step 4: Training loop for epoch in range(3): + print(f'Epoch {epoch}') for step, batch in tqdm(enumerate(dataloader)): - # Convert Twinkle's InputFeature to Tinker's Datum + # Convert Twinkle's InputFeature to Tinker's Datum format input_datum = [input_feature_to_datum(input_feature) for input_feature in batch] - fwdbwd_future = training_client.forward_backward(input_datum, "cross_entropy") + # Send data to Server: forward + backward propagation + fwdbwd_future = training_client.forward_backward(input_datum, 'cross_entropy') + + # Optimizer step: update model weights with Adam optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) + # Wait for both operations to complete fwdbwd_result = fwdbwd_future.result() optim_result = optim_future.result() - training_client.save_state(f"twinkle-lora-{epoch}").result() + # Compute weighted average log-loss per token for monitoring + logprobs = np.concatenate([output['logprobs'].tolist() for output in fwdbwd_result.loss_fn_outputs]) + weights = np.concatenate([example.loss_fn_inputs['weights'].tolist() for example in input_datum]) + print(f'Loss per token: {-np.dot(logprobs, weights) / weights.sum():.4f}') + print(f'Training Metrics: {optim_result}') + + # Save a checkpoint after each epoch + save_future = training_client.save_state(f'twinkle-lora-{epoch}') + save_result = save_future.result() + print(f'Saved checkpoint to {save_result.path}') ``` ## Inference Sampling @@ -216,49 +132,55 @@ You can also load saved checkpoints for inference: ```python import os from tinker import types -from modelscope import AutoTokenizer from twinkle import init_tinker_client +from twinkle.data_format import Message, Trajectory +from twinkle.template import Template # Initialize Tinker client before importing ServiceClient init_tinker_client() from tinker import ServiceClient -base_model = "Qwen/Qwen3.5-4B" +base_model = 'Qwen/Qwen3.5-4B' +base_url = 'http://localhost:8000' +api_key = 'EMPTY_API_KEY' -service_client = ServiceClient( - base_url='http://localhost:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') # Recommended: set to ModelScope Token -) +service_client = ServiceClient(base_url=base_url, api_key=api_key) # Create sampling client from saved checkpoint sampling_client = service_client.create_sampling_client( - model_path="twinkle://run_id/weights/checkpoint_name", # twinkle:// path of the checkpoint + model_path='twinkle://run_id/weights/checkpoint_name', # twinkle:// path of the checkpoint base_model=base_model ) -# Prepare inference input -tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) +# Use Twinkle's Template to build multi-turn dialogue input +template = Template(model_id=f'ms://{base_model}') -# Construct multi-turn dialogue input -inputs = [ - {'role': 'system', 'content': 'You are a helpful assistant.'}, - {'role': 'user', 'content': 'what is your name?'} -] -input_ids = tokenizer.apply_chat_template(inputs, tokenize=True, add_generation_prompt=True) +trajectory = Trajectory( + messages=[ + Message(role='system', content='You are a helpful assistant'), + Message(role='user', content='What is your name?'), + ] +) + +input_feature = template.batch_encode([trajectory], add_generation_prompt=True)[0] +input_ids = input_feature['input_ids'].tolist() prompt = types.ModelInput.from_ints(input_ids) params = types.SamplingParams( max_tokens=50, # Maximum number of tokens to generate temperature=0.2, # Low temperature, more focused answers - stop=["\n"] # Stop when encountering newline ) # Generate multiple completions -result = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8).result() +print('Sampling...') +future = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8) +result = future.result() +# Decode and print each response +print('Responses:') for i, seq in enumerate(result.sequences): - print(f"{i}: {tokenizer.decode(seq.tokens)}") + print(f'{i}: {repr(template.decode(seq.tokens))}') ``` ### Publishing Checkpoint to ModelScope Hub diff --git a/docs/source_en/Usage Guide/Server and Client/Twinkle-Client.md b/docs/source_en/Usage Guide/Server and Client/Twinkle-Client.md index 85980986..1b3a0f6c 100644 --- a/docs/source_en/Usage Guide/Server and Client/Twinkle-Client.md +++ b/docs/source_en/Usage Guide/Server and Client/Twinkle-Client.md @@ -47,8 +47,9 @@ from twinkle.dataset import Dataset from twinkle.model import MultiLoraTransformersModel # Remote training code (after migration) -from twinkle_client.dataloader import DataLoader -from twinkle_client.dataset import Dataset +# DataLoader and Dataset can be imported from either local twinkle or remote twinkle_client +from twinkle.dataloader import DataLoader # or: from twinkle_client.dataloader import DataLoader +from twinkle.dataset import Dataset # or: from twinkle_client.dataset import Dataset from twinkle_client.model import MultiLoraTransformersModel ``` @@ -57,32 +58,38 @@ Training loops, data processing, and other logic do not need any modifications. ## Complete Training Example (Transformers Backend) ```python -import os import dotenv dotenv.load_dotenv('.env') from peft import LoraConfig from twinkle import get_logger from twinkle.dataset import DatasetMeta +from twinkle_client import init_twinkle_client -# Import from twinkle_client instead of twinkle to enable remote calls -from twinkle_client.dataloader import DataLoader -from twinkle_client.dataset import Dataset +# DataLoader and Dataset can be imported from either local twinkle or remote twinkle_client +from twinkle.dataloader import DataLoader +from twinkle.dataset import Dataset from twinkle_client.model import MultiLoraTransformersModel -from twinkle_client import init_twinkle_client logger = get_logger() +base_model = 'Qwen/Qwen3.5-4B' +base_url = 'http://localhost:8000' +api_key = 'EMPTY_API_KEY' + # Step 1: Initialize client -client = init_twinkle_client( - base_url='http://127.0.0.1:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') -) +client = init_twinkle_client(base_url=base_url, api_key=api_key) + +# List available models on the server +print('Available models:') +for item in client.get_server_capabilities().supported_models: + print('- ' + item.model_name) # Step 2: Query existing training runs (optional, for resuming training) runs = client.list_training_runs() resume_path = None for run in runs: + logger.info(run.model_dump_json(indent=2)) checkpoints = client.list_checkpoints(run.training_run_id) for checkpoint in checkpoints: logger.info(checkpoint.model_dump_json(indent=2)) @@ -90,10 +97,11 @@ for run in runs: # resume_path = checkpoint.twinkle_path # Step 3: Prepare dataset -dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition')) +# data_slice limits the number of samples loaded +dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(500))) # Set chat template to match model's input format -dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B', max_length=512) +dataset.set_template('Qwen3_5Template', model_id=f'ms://{base_model}', max_length=512) # Data preprocessing: Replace placeholders with custom names dataset.map('SelfCognitionProcessor', @@ -103,13 +111,14 @@ dataset.map('SelfCognitionProcessor', dataset.encode(batched=True) # Create DataLoader -dataloader = DataLoader(dataset=dataset, batch_size=8) +dataloader = DataLoader(dataset=dataset, batch_size=4) # Step 4: Configure model -model = MultiLoraTransformersModel(model_id='ms://Qwen/Qwen3.5-4B') +model = MultiLoraTransformersModel(model_id=f'ms://{base_model}') -# Configure LoRA +# Configure LoRA: apply low-rank adapters to all linear layers lora_config = LoraConfig(target_modules='all-linear') +# gradient_accumulation_steps=2: accumulate gradients over 2 micro-batches before each optimizer step model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2) # Set template, processor, loss function @@ -117,9 +126,11 @@ model.set_template('Qwen3_5Template') model.set_processor('InputProcessor', padding_side='right') model.set_loss('CrossEntropyLoss') -# Set optimizer and learning rate scheduler -model.set_optimizer('AdamW', lr=1e-4) -model.set_lr_scheduler('LinearLR') +# Set optimizer (only Adam is supported if the server uses Megatron backend) +model.set_optimizer('Adam', lr=1e-4) + +# Set LR scheduler (not supported if the server uses Megatron backend) +# model.set_lr_scheduler('LinearLR') # Step 5: Resume training (optional) if resume_path: @@ -127,35 +138,34 @@ if resume_path: model.load(resume_path, load_optimizer=True) # Step 6: Training loop -for step, batch in enumerate(dataloader): - # Forward propagation + backward propagation - output = model.forward_backward(inputs=batch) +logger.info(model.get_train_configs().model_dump()) - if step % 2 == 0: - logger.info(f'Step {step // 2}, loss: {output}') +for epoch in range(3): + logger.info(f'Starting epoch {epoch}') + for step, batch in enumerate(dataloader): + # Forward propagation + backward propagation + model.forward_backward(inputs=batch) - # Gradient clipping - model.clip_grad_norm(1.0) + # Gradient clipping + optimizer update (equivalent to clip_grad_norm / step / zero_grad / lr_step) + model.clip_grad_and_step() - # Optimizer update - model.step() + # Log metrics every 2 steps (aligned with gradient_accumulation_steps) + if step % 2 == 0: + metric = model.calculate_metric(is_training=True) + logger.info(f'Epoch {epoch}, step {step}/{len(dataloader)}, metric: {metric.result}') - # Zero gradients - model.zero_grad() - - # Learning rate scheduling - model.lr_step() - -# Step 7: Save checkpoint -twinkle_path = model.save(name=f'step-{step}', save_optimizer=True) -logger.info(f"Saved checkpoint: {twinkle_path}") + # Step 7: Save checkpoint + twinkle_path = model.save(name=f'twinkle-epoch-{epoch}', save_optimizer=True) + logger.info(f'Saved checkpoint: {twinkle_path}') # Step 8: Upload to ModelScope Hub (optional) -model.upload_to_hub( - checkpoint_dir=twinkle_path, - hub_model_id='your-username/your-model-name', - async_upload=False -) +# YOUR_USER_NAME = "your_username" +# hub_model_id = f'{YOUR_USER_NAME}/twinkle-self-cognition' +# model.upload_to_hub( +# checkpoint_dir=twinkle_path, +# hub_model_id=hub_model_id, +# async_upload=False +# ) ``` ## Differences with Megatron Backend diff --git a/docs/source_en/Usage Guide/Train-as-a-Service.md b/docs/source_en/Usage Guide/Train-as-a-Service.md index 692ef3f4..f5e4ea3f 100644 --- a/docs/source_en/Usage Guide/Train-as-a-Service.md +++ b/docs/source_en/Usage Guide/Train-as-a-Service.md @@ -108,7 +108,7 @@ trajectory = Trajectory( ] ) -input_feature = template.encode(trajectory, add_generation_prompt=True) +input_feature = template.batch_encode([trajectory], add_generation_prompt=True)[0] input_ids = input_feature['input_ids'].tolist() diff --git "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Tinker\345\205\274\345\256\271\345\256\242\346\210\267\347\253\257.md" "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Tinker\345\205\274\345\256\271\345\256\242\346\210\267\347\253\257.md" index 1340fc06..a1f7e064 100644 --- "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Tinker\345\205\274\345\256\271\345\256\242\346\210\267\347\253\257.md" +++ "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Tinker\345\205\274\345\256\271\345\256\242\346\210\267\347\253\257.md" @@ -34,110 +34,11 @@ for item in service_client.get_server_capabilities().supported_models: ## 完整训练示例 -```python -import os -import numpy as np -import dotenv -dotenv.load_dotenv('.env') - -# Step 1: 在导入 ServiceClient 之前,先初始化 Tinker 客户端 -from twinkle import init_tinker_client -init_tinker_client() - -from tinker import types, ServiceClient -from modelscope import AutoTokenizer - -service_client = ServiceClient( - base_url='http://localhost:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') # 建议设置为 ModelScope Token -) - -# Step 2: 查询已有训练运行(可选) -rest_client = service_client.create_rest_client() -response = rest_client.list_training_runs(limit=50).result() -print(f"Found {len(response.training_runs)} training runs") - -# Step 3: 创建训练客户端 -base_model = "Qwen/Qwen3-4B" - -# 新建训练会话 -training_client = service_client.create_lora_training_client( - base_model=base_model -) - -# 或从检查点恢复 -# resume_path = "twinkle://run_id/weights/checkpoint_name" -# training_client = service_client.create_training_client_from_state_with_optimizer(path=resume_path) - -# Step 4: 准备训练数据 -examples = [ - {"input": "banana split", "output": "anana-bay plit-say"}, - {"input": "quantum physics", "output": "uantum-qay ysics-phay"}, - {"input": "donut shop", "output": "onut-day op-shay"}, -] - -tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) - -def process_example(example: dict, tokenizer) -> types.Datum: - """将原始样本转为 Tinker API 所需的 Datum 格式。 - - Datum 包含: - - model_input: 输入 token IDs - - loss_fn_inputs: 目标 token 和逐 token 权重(0=忽略, 1=计算损失) - """ - prompt = f"English: {example['input']}\nPig Latin:" - - # 提示部分:weight=0,不参与损失计算 - prompt_tokens = tokenizer.encode(prompt, add_special_tokens=True) - prompt_weights = [0] * len(prompt_tokens) - - # 补全部分:weight=1,参与损失计算 - completion_tokens = tokenizer.encode(f" {example['output']}\n\n", add_special_tokens=False) - completion_weights = [1] * len(completion_tokens) - - # 拼接并构建 next-token prediction 格式 - tokens = prompt_tokens + completion_tokens - weights = prompt_weights + completion_weights - - input_tokens = tokens[:-1] - target_tokens = tokens[1:] - weights = weights[1:] - - return types.Datum( - model_input=types.ModelInput.from_ints(tokens=input_tokens), - loss_fn_inputs=dict(weights=weights, target_tokens=target_tokens) - ) - -processed_examples = [process_example(ex, tokenizer) for ex in examples] - -# Step 5: 训练循环 -for epoch in range(2): - for batch in range(5): - # 发送训练数据到 Server:前向 + 反向传播 - fwdbwd_future = training_client.forward_backward(processed_examples, "cross_entropy") - # 优化器更新 - optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) - - # 等待结果 - fwdbwd_result = fwdbwd_future.result() - optim_result = optim_future.result() - - # 计算加权平均 log-loss - logprobs = np.concatenate([o['logprobs'].tolist() for o in fwdbwd_result.loss_fn_outputs]) - weights = np.concatenate([e.loss_fn_inputs['weights'].tolist() for e in processed_examples]) - print(f"Epoch {epoch}, Batch {batch}: Loss = {-np.dot(logprobs, weights) / weights.sum():.4f}") - - # 每个 epoch 保存检查点 - save_result = training_client.save_state(f"lora-epoch-{epoch}").result() - print(f"Saved checkpoint to {save_result.path}") -``` - -## 使用 Twinkle 数据集组件 - -Tinker 兼容模式也可以利用 Twinkle 的数据集组件来简化数据准备,而不是手动构建 `Datum`: +> **注意**:Tinker 兼容模式的 `DataLoader` 和 `Dataset` 只支持从本地 `twinkle` 导入,不支持 `twinkle_client`。 ```python import os +import numpy as np from tqdm import tqdm from tinker import types from twinkle import init_tinker_client @@ -146,40 +47,55 @@ from twinkle.dataset import Dataset, DatasetMeta from twinkle.preprocessor import SelfCognitionProcessor from twinkle.server.common import input_feature_to_datum -# 在导入 ServiceClient 之前,先初始化 Tinker 客户端 +# Step 1: 在导入 ServiceClient 之前,先初始化 Tinker 客户端 init_tinker_client() from tinker import ServiceClient -base_model = "Qwen/Qwen3.5-4B" +base_model = 'Qwen/Qwen3.5-4B' +base_url = 'http://localhost:8000' +api_key = 'EMPTY_API_KEY' -# 使用 Twinkle 的 Dataset 组件加载和预处理数据 +# Step 2: 准备数据集 dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(500))) dataset.set_template('Qwen3_5Template', model_id=f'ms://{base_model}', max_length=256) dataset.map(SelfCognitionProcessor('twinkle模型', 'twinkle团队'), load_from_cache_file=False) dataset.encode(batched=True, load_from_cache_file=False) dataloader = DataLoader(dataset=dataset, batch_size=8) -# 初始化客户端 -service_client = ServiceClient( - base_url='http://localhost:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') # 建议设置为 ModelScope Token -) +# Step 3: 初始化训练客户端 +service_client = ServiceClient(base_url=base_url, api_key=api_key) + +# 创建 LoRA 训练客户端(rank=16 指定 LoRA 适配器秩) training_client = service_client.create_lora_training_client(base_model=base_model, rank=16) -# 训练循环:使用 input_feature_to_datum 转换数据格式 +# Step 4: 训练循环 for epoch in range(3): + print(f'Epoch {epoch}') for step, batch in tqdm(enumerate(dataloader)): - # 将 Twinkle 的 InputFeature 转换为 Tinker 的 Datum + # 将 Twinkle 的 InputFeature 转换为 Tinker 的 Datum 格式 input_datum = [input_feature_to_datum(input_feature) for input_feature in batch] - fwdbwd_future = training_client.forward_backward(input_datum, "cross_entropy") + # 发送数据到 Server:前向 + 反向传播 + fwdbwd_future = training_client.forward_backward(input_datum, 'cross_entropy') + + # 优化器更新:Adam 更新模型权重 optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) + # 等待两个操作完成 fwdbwd_result = fwdbwd_future.result() optim_result = optim_future.result() - training_client.save_state(f"twinkle-lora-{epoch}").result() + # 计算每 token 加权平均 log-loss 用于监控 + logprobs = np.concatenate([output['logprobs'].tolist() for output in fwdbwd_result.loss_fn_outputs]) + weights = np.concatenate([example.loss_fn_inputs['weights'].tolist() for example in input_datum]) + print(f'Loss per token: {-np.dot(logprobs, weights) / weights.sum():.4f}') + print(f'Training Metrics: {optim_result}') + + # 每个 epoch 保存检查点 + save_future = training_client.save_state(f'twinkle-lora-{epoch}') + save_result = save_future.result() + print(f'Saved checkpoint to {save_result.path}') ``` ## 推理采样 @@ -216,49 +132,55 @@ for i, seq in enumerate(result.sequences): ```python import os from tinker import types -from modelscope import AutoTokenizer from twinkle import init_tinker_client +from twinkle.data_format import Message, Trajectory +from twinkle.template import Template # 在导入 ServiceClient 之前,先初始化 Tinker 客户端 init_tinker_client() from tinker import ServiceClient -base_model = "Qwen/Qwen/Qwen3.5-4B" +base_model = 'Qwen/Qwen3.5-4B' +base_url = 'http://localhost:8000' +api_key = 'EMPTY_API_KEY' -service_client = ServiceClient( - base_url='http://localhost:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') # 建议设置为 ModelScope Token -) +service_client = ServiceClient(base_url=base_url, api_key=api_key) # 从已保存的检查点创建采样客户端 sampling_client = service_client.create_sampling_client( - model_path="twinkle://run_id/weights/checkpoint_name", # 检查点的 twinkle:// 路径 + model_path='twinkle://run_id/weights/checkpoint_name', # 检查点的 twinkle:// 路径 base_model=base_model ) -# 准备推理输入 -tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) +# 使用 Twinkle 的 Template 构建多轮对话输入 +template = Template(model_id=f'ms://{base_model}') -# 构建多轮对话输入 -inputs = [ - {'role': 'system', 'content': 'You are a helpful assistant.'}, - {'role': 'user', 'content': 'what is your name?'} -] -input_ids = tokenizer.apply_chat_template(inputs, tokenize=True, add_generation_prompt=True) +trajectory = Trajectory( + messages=[ + Message(role='system', content='You are a helpful assistant'), + Message(role='user', content='你是谁?'), + ] +) + +input_feature = template.batch_encode([trajectory], add_generation_prompt=True)[0] +input_ids = input_feature['input_ids'].tolist() prompt = types.ModelInput.from_ints(input_ids) params = types.SamplingParams( max_tokens=50, # 最大生成 token 数 temperature=0.2, # 低温度,更聚焦的回答 - stop=["\n"] # 遇到换行停止 ) # 生成多条补全 -result = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8).result() +print('Sampling...') +future = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8) +result = future.result() +# 解码并打印每条响应 +print('Responses:') for i, seq in enumerate(result.sequences): - print(f"{i}: {tokenizer.decode(seq.tokens)}") + print(f'{i}: {repr(template.decode(seq.tokens))}') ``` ### 发布检查点到 ModelScope Hub diff --git "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Twinkle\345\256\242\346\210\267\347\253\257.md" "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Twinkle\345\256\242\346\210\267\347\253\257.md" index c9fded19..9548ed35 100644 --- "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Twinkle\345\256\242\346\210\267\347\253\257.md" +++ "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Twinkle\345\256\242\346\210\267\347\253\257.md" @@ -47,8 +47,9 @@ from twinkle.dataset import Dataset from twinkle.model import MultiLoraTransformersModel # 远端训练代码(迁移后) -from twinkle_client.dataloader import DataLoader -from twinkle_client.dataset import Dataset +# DataLoader 和 Dataset 使用本地 twinkle 或远端 twinkle_client 均可 +from twinkle.dataloader import DataLoader # 或 from twinkle_client.dataloader import DataLoader +from twinkle.dataset import Dataset # 或 from twinkle_client.dataset import Dataset from twinkle_client.model import MultiLoraTransformersModel ``` @@ -57,32 +58,38 @@ from twinkle_client.model import MultiLoraTransformersModel ## 完整训练示例(Transformers 后端) ```python -import os import dotenv dotenv.load_dotenv('.env') from peft import LoraConfig from twinkle import get_logger from twinkle.dataset import DatasetMeta +from twinkle_client import init_twinkle_client -# 从 twinkle_client import 替代 twinkle,实现远端调用 -from twinkle_client.dataloader import DataLoader -from twinkle_client.dataset import Dataset +# DataLoader 和 Dataset 使用本地 twinkle 或远端 twinkle_client 均可 +from twinkle.dataloader import DataLoader +from twinkle.dataset import Dataset from twinkle_client.model import MultiLoraTransformersModel -from twinkle_client import init_twinkle_client logger = get_logger() +base_model = 'Qwen/Qwen3.5-4B' +base_url = 'http://localhost:8000' +api_key = 'EMPTY_API_KEY' + # Step 1: 初始化客户端 -client = init_twinkle_client( - base_url='http://127.0.0.1:8000', - api_key=os.environ.get('MODELSCOPE_TOKEN') -) +client = init_twinkle_client(base_url=base_url, api_key=api_key) + +# 列出服务器支持的模型 +print('Available models:') +for item in client.get_server_capabilities().supported_models: + print('- ' + item.model_name) # Step 2: 查询已有训练运行(可选,用于恢复训练) runs = client.list_training_runs() resume_path = None for run in runs: + logger.info(run.model_dump_json(indent=2)) checkpoints = client.list_checkpoints(run.training_run_id) for checkpoint in checkpoints: logger.info(checkpoint.model_dump_json(indent=2)) @@ -90,26 +97,28 @@ for run in runs: # resume_path = checkpoint.twinkle_path # Step 3: 准备数据集 -dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition')) +# data_slice 可限制加载的数据量 +dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(500))) # 设置 chat 模板,使数据匹配模型的输入格式 -dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B', max_length=512) +dataset.set_template('Qwen3_5Template', model_id=f'ms://{base_model}', max_length=512) # 数据预处理:替换占位符为自定义名称 dataset.map('SelfCognitionProcessor', - init_args={'model_name': 'twinkle模型', 'model_author': 'twinkle团队'}) + init_args={'model_name': 'twinkle模型', 'model_author': 'ModelScope社区'}) # 编码数据集为模型可用的 token dataset.encode(batched=True) # 创建 DataLoader -dataloader = DataLoader(dataset=dataset, batch_size=8) +dataloader = DataLoader(dataset=dataset, batch_size=4) # Step 4: 配置模型 -model = MultiLoraTransformersModel(model_id='ms://Qwen/Qwen3.5-4B') +model = MultiLoraTransformersModel(model_id=f'ms://{base_model}') -# 配置 LoRA +# 配置 LoRA:对所有线性层应用低秩适配器 lora_config = LoraConfig(target_modules='all-linear') +# gradient_accumulation_steps=2 表示累积 2 个 micro-batch 的梯度后再执行一次优化器更新 model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2) # 设置模板、处理器、损失函数 @@ -117,9 +126,11 @@ model.set_template('Qwen3_5Template') model.set_processor('InputProcessor', padding_side='right') model.set_loss('CrossEntropyLoss') -# 设置优化器和学习率调度器 -model.set_optimizer('AdamW', lr=1e-4) -model.set_lr_scheduler('LinearLR') +# 设置优化器(如果服务器使用 Megatron 后端,仅支持 Adam 优化器) +model.set_optimizer('Adam', lr=1e-4) + +# 设置学习率调度器(如果服务器使用 Megatron 后端,不支持 LR 调度器) +# model.set_lr_scheduler('LinearLR') # Step 5: 恢复训练(可选) if resume_path: @@ -127,35 +138,34 @@ if resume_path: model.load(resume_path, load_optimizer=True) # Step 6: 训练循环 -for step, batch in enumerate(dataloader): - # 前向传播 + 反向传播 - output = model.forward_backward(inputs=batch) +logger.info(model.get_train_configs().model_dump()) - if step % 2 == 0: - logger.info(f'Step {step // 2}, loss: {output}') +for epoch in range(3): + logger.info(f'Starting epoch {epoch}') + for step, batch in enumerate(dataloader): + # 前向传播 + 反向传播 + model.forward_backward(inputs=batch) - # 梯度裁剪 - model.clip_grad_norm(1.0) + # 梯度裁剪 + 优化器更新(等价于依次调用 clip_grad_norm / step / zero_grad / lr_step) + model.clip_grad_and_step() - # 优化器更新 - model.step() + # 每 2 步打印一次指标(与 gradient_accumulation_steps 对齐) + if step % 2 == 0: + metric = model.calculate_metric(is_training=True) + logger.info(f'Epoch {epoch}, step {step}/{len(dataloader)}, metric: {metric.result}') - # 梯度清零 - model.zero_grad() - - # 学习率调度 - model.lr_step() - -# Step 7: 保存检查点 -twinkle_path = model.save(name=f'step-{step}', save_optimizer=True) -logger.info(f"Saved checkpoint: {twinkle_path}") + # Step 7: 保存检查点 + twinkle_path = model.save(name=f'twinkle-epoch-{epoch}', save_optimizer=True) + logger.info(f'Saved checkpoint: {twinkle_path}') # Step 8: 上传到 ModelScope Hub(可选) -model.upload_to_hub( - checkpoint_dir=twinkle_path, - hub_model_id='your-username/your-model-name', - async_upload=False -) +# YOUR_USER_NAME = "your_username" +# hub_model_id = f'{YOUR_USER_NAME}/twinkle-self-cognition' +# model.upload_to_hub( +# checkpoint_dir=twinkle_path, +# hub_model_id=hub_model_id, +# async_upload=False +# ) ``` ## Megatron 后端的差异 diff --git "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\246\202\350\277\260.md" "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\246\202\350\277\260.md" index 82861e0b..ddb72408 100644 --- "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\246\202\350\277\260.md" +++ "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\246\202\350\277\260.md" @@ -49,26 +49,35 @@ Twinkle 提供了完整的 HTTP Server/Client 架构,支持将模型部署为 cookbook/client/ ├── server/ # Server 启动配置 │ ├── transformer/ # Transformers 后端 -│ │ ├── server.py # 启动脚本 +│ │ ├── run.sh # 启动脚本 +│ │ ├── server.py # Server 入口 │ │ └── server_config.yaml # 配置文件 │ └── megatron/ # Megatron 后端 +│ ├── run.sh │ ├── server.py │ ├── server_config.yaml │ └── server_config_4b.yaml ├── twinkle/ # Twinkle Client 示例 │ ├── self_host/ # 自托管 Server -│ │ ├── grpo.py # GRPO 训练客户端 +│ │ ├── dpo.py # DPO 训练客户端 +│ │ ├── multi_modal.py # 多模态训练客户端 │ │ ├── sample.py # 推理采样客户端 -│ │ └── self_congnition.py # 自我认知训练客户端 +│ │ ├── self_congnition.py # 自我认知训练客户端 +│ │ └── short_math_grpo.py # GRPO 数学训练客户端 │ └── modelscope/ # ModelScope 托管服务 +│ ├── dpo.py +│ ├── multi_modal.py │ └── self_congnition.py └── tinker/ # Tinker Client 示例 ├── self_host/ # 自托管 Server + │ ├── dpo.py # DPO 训练客户端 │ ├── lora.py # LoRA 训练客户端 + │ ├── multi_modal.py # 多模态训练客户端 │ ├── sample.py # 推理采样客户端 │ ├── self_cognition.py # 自我认知训练客户端 │ └── short_math_grpo.py # GRPO 数学训练客户端 └── modelscope/ # ModelScope 托管服务 + ├── dpo.py ├── sample.py ├── self_cognition.py └── short_math_grpo.py diff --git "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\350\256\255\347\273\203\346\234\215\345\212\241.md" "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\350\256\255\347\273\203\346\234\215\345\212\241.md" index 0c7afc44..d74887c4 100644 --- "a/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\350\256\255\347\273\203\346\234\215\345\212\241.md" +++ "b/docs/source_zh/\344\275\277\347\224\250\346\214\207\345\274\225/\350\256\255\347\273\203\346\234\215\345\212\241.md" @@ -111,7 +111,7 @@ trajectory = Trajectory( ] ) -input_feature = template.encode(trajectory, add_generation_prompt=True) +input_feature = template.batch_encode([trajectory], add_generation_prompt=True)[0] input_ids = input_feature['input_ids'].tolist() From 3f2e86f1e523bfe9b56914c68dba61306fe1628f Mon Sep 17 00:00:00 2001 From: Yunlin Mao Date: Wed, 15 Apr 2026 23:30:18 +0800 Subject: [PATCH 3/3] Update cookbook/client/tinker/modelscope/dpo.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- cookbook/client/tinker/modelscope/dpo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cookbook/client/tinker/modelscope/dpo.py b/cookbook/client/tinker/modelscope/dpo.py index f19000b1..a88b70a7 100644 --- a/cookbook/client/tinker/modelscope/dpo.py +++ b/cookbook/client/tinker/modelscope/dpo.py @@ -41,7 +41,7 @@ # --------------------------------------------------------------------------- base_model = 'Qwen/Qwen3.6-35B-A3B' base_url = 'http://www.modelscope.cn/twinkle' -api_key = 'EMPTY_API_KEY' +api_key = os.environ.get('MODELSCOPE_TOKEN') dataset_id = 'ms://hjh0119/shareAI-Llama3-DPO-zh-en-emoji' batch_size = 4