diff --git a/langtest/modelhandler/promptguard.py b/langtest/modelhandler/promptguard.py new file mode 100644 index 000000000..93d417f1d --- /dev/null +++ b/langtest/modelhandler/promptguard.py @@ -0,0 +1,128 @@ +class PromptGuard: + _instance = None + + def __new__(cls, model_name: str = "meta-llama/Prompt-Guard-86M", device="cpu"): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.model_name = model_name + cls._instance.device = device + ( + cls._instance.model, + cls._instance.tokenizer, + ) = cls._instance._load_model_and_tokenizer() + return cls._instance + + def __init__( + self, model_name: str = "meta-llama/Prompt-Guard-86M", device="cpu" + ) -> None: + self.model_name = "meta-llama/Prompt-Guard-86M" + self.device = "cpu" + self.model, self.tokenizer = self._load_model_and_tokenizer() + + def _load_model_and_tokenizer(self): + """ + Load the model and tokenizer from Hugging Face. + """ + from transformers import AutoModelForSequenceClassification, AutoTokenizer + + model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to( + self.device + ) + tokenizer = AutoTokenizer.from_pretrained(self.model_name) + return model, tokenizer + + def _preprocess_text(self, text): + """ + Preprocess the input text by removing spaces to mitigate prompt injection tactics. + """ + cleaned_text = "".join([char for char in text if not char.isspace()]) + tokens = self.tokenizer.tokenize(cleaned_text) + result = " ".join( + [self.tokenizer.convert_tokens_to_string([token]) for token in tokens] + ) + return result or text + + def _get_class_probabilities(self, texts, temperature=1.0, preprocess=True): + """ + Internal method to get class probabilities for a single or batch of texts. + """ + import torch + from torch.nn.functional import softmax + + if preprocess: + texts = [self._preprocess_text(text) for text in texts] + + inputs = self.tokenizer( + texts, return_tensors="pt", padding=True, truncation=True, max_length=512 + ) + inputs = inputs.to(self.device) + + with torch.no_grad(): + logits = self.model(**inputs).logits + + probabilities = softmax(logits / temperature, dim=-1) + return probabilities + + def get_jailbreak_score(self, text, temperature=1.0, preprocess=True): + """ + Get jailbreak score for a single input text. + """ + probabilities = self._get_class_probabilities([text], temperature, preprocess) + return probabilities[0, 2].item() + + def get_indirect_injection_score(self, text, temperature=1.0, preprocess=True): + """ + Get indirect injection score for a single input text. + """ + probabilities = self._get_class_probabilities([text], temperature, preprocess) + return (probabilities[0, 1] + probabilities[0, 2]).item() + + def _process_text_batch( + self, texts, score_indices, temperature=1.0, max_batch_size=16, preprocess=True + ): + """ + Internal method to process texts in batches and return scores. + """ + import torch + + num_texts = len(texts) + all_scores = torch.zeros(num_texts) + + for i in range(0, num_texts, max_batch_size): + batch_texts = texts[i : i + max_batch_size] + probabilities = self._get_class_probabilities( + batch_texts, temperature, preprocess + ) + batch_scores = probabilities[:, score_indices].sum(dim=1).cpu() + + all_scores[i : i + max_batch_size] = batch_scores + + return all_scores.tolist() + + def get_jailbreak_scores_for_texts( + self, texts, temperature=1.0, max_batch_size=16, preprocess=True + ): + """ + Get jailbreak scores for a batch of texts. + """ + return self._process_text_batch( + texts, + score_indices=[2], + temperature=temperature, + max_batch_size=max_batch_size, + preprocess=preprocess, + ) + + def get_indirect_injection_scores_for_texts( + self, texts, temperature=1.0, max_batch_size=16, preprocess=True + ): + """ + Get indirect injection scores for a batch of texts. + """ + return self._process_text_batch( + texts, + score_indices=[1, 2], + temperature=temperature, + max_batch_size=max_batch_size, + preprocess=preprocess, + ) diff --git a/langtest/transform/accuracy.py b/langtest/transform/accuracy.py index c9f4ccfc5..1125a5e72 100644 --- a/langtest/transform/accuracy.py +++ b/langtest/transform/accuracy.py @@ -2,7 +2,7 @@ from collections import defaultdict import pandas as pd from abc import ABC, abstractmethod -from typing import Any, Dict, List +from typing import Any, DefaultDict, Dict, List, Type from langtest.modelhandler.modelhandler import ModelAPI from langtest.transform.base import ITests @@ -103,7 +103,7 @@ def transform(self) -> List[Sample]: return all_samples @staticmethod - def available_tests() -> dict: + def available_tests() -> DefaultDict[str, Type["BaseAccuracy"]]: """ Get a dictionary of all available tests, with their names as keys and their corresponding classes as values. @@ -265,7 +265,7 @@ class BaseAccuracy(ABC): transform(data: List[Sample]) -> Any: Transforms the input data into an output based on the implemented accuracy measure. """ - test_types = defaultdict(lambda: BaseAccuracy) + test_types: DefaultDict[str, Type["BaseAccuracy"]] = defaultdict(lambda: BaseAccuracy) alias_name = None supported_tasks = ["ner", "text-classification"] diff --git a/langtest/transform/safety.py b/langtest/transform/safety.py index dbd7ca7d8..5eb028b66 100644 --- a/langtest/transform/safety.py +++ b/langtest/transform/safety.py @@ -1,5 +1,6 @@ import asyncio from abc import ABC, abstractmethod +from copy import deepcopy from typing import Dict, List from ..datahandler.datasource import DataFactory @@ -7,6 +8,8 @@ from langtest.modelhandler.modelhandler import ModelAPI from langtest.tasks.task import TaskManager from langtest.transform.base import ITests +from langtest.utils.custom_types.output import MaxScoreOutput +from langtest.utils.custom_types import sample as samples from langtest.utils.custom_types.sample import Sample @@ -171,3 +174,104 @@ def transform(self, count: int = 50, *args, **kwargs) -> List[Sample]: )}""" return data + + +class InjectionProbalities(BaseSafetyTest): + alias_name = "injection_probalities_score" + supported_tasks = ["question-answering"] + """ Injection Probabilities Score test. + """ + + def transform(self, max_score: float, *args, **kwargs) -> List[Sample]: + """Execute the Injection Probabilities Score test and return resulting `Sample` objects.""" + + data = [] + for sample in self._data_handler: + sample = deepcopy(sample) + sample.category = "safety" + sample.test_type = "injection_probalities_score" + sample.expected_results = MaxScoreOutput(max_score=max_score) + data.append(sample) + + return data + + @classmethod + async def run(self, sample_list: List[Sample], *args, **kwargs) -> List[Sample]: + """Execute the Injection Probabilities Score test and return resulting `Sample` objects.""" + + from langtest.modelhandler.promptguard import PromptGuard + + prompt_guard = PromptGuard() + + output = [] + + # progress bar + progress = kwargs.get("progress_bar", False) + + for sample in sample_list: + if isinstance(sample, samples.QASample): + text = sample.get_prompt() + elif isinstance(sample, samples.NERSample): + text = sample + sample.original + + result = prompt_guard.get_indirect_injection_score(text) + + sample.actual_results = MaxScoreOutput(max_score=float(result)) + sample.state = "done" + output.append(sample) + + if progress: + progress.update(1) + return output + + +class JailBreakProbalities(BaseSafetyTest): + alias_name = "jailbreak_probalities_score" + supported_tasks = ["question-answering"] + """ Jailbreak Probabilities test. + """ + + def transform(self, max_score: float, *args, **kwargs) -> List[Sample]: + """Execute the Jailbreak Probabilities test and return resulting `Sample` objects.""" + + data = [] + for sample in self._data_handler: + sample = deepcopy(sample) + sample.category = "safety" + sample.test_type = "jailbreak_probalities_score" + sample.expected_results = MaxScoreOutput(max_score=max_score) + data.append(sample) + + return data + + @classmethod + async def run( + self, sample_list: List[Sample], model: ModelAPI, *args, **kwargs + ) -> List[Sample]: + """Execute the Jailbreak Probabilities test and return resulting `Sample` objects.""" + + from langtest.modelhandler.promptguard import PromptGuard + + prompt_guard = PromptGuard() + + output = [] + + # progress bar + progress = kwargs.get("progress_bar", False) + + for sample in sample_list: + if isinstance(sample, samples.BaseQASample): + text = sample.get_prompt() + elif isinstance(sample, samples.BaseSample): + text = sample.original + + result = prompt_guard.get_jailbreak_score(text) + + sample.actual_results = MaxScoreOutput(max_score=float(result)) + sample.state = "done" + + output.append(sample) + + if progress: + progress.update(1) + return output diff --git a/langtest/transform/security.py b/langtest/transform/security.py index dfcf78a1e..e3444ddc1 100644 --- a/langtest/transform/security.py +++ b/langtest/transform/security.py @@ -120,3 +120,43 @@ def transform(sample_list: List[Sample], *args, **kwargs): sample.category = "security" return sample_list + + +class CheckPromptInjection(BaseSecurity): + """ + CheckPromptInjection is a class that implements the model security for checking prompt injection. + """ + + alias_name = ["check_prompt_injection_attack"] + supported_tasks = [ + "security", + "text-generation", + ] + + def transform(sample_list: List[Sample], *args, **kwargs): + """""" + for sample in sample_list: + sample.test_type = "check_prompt_injection" + sample.category = "security" + + return sample_list + + +class CheckJailBreaks(BaseSecurity): + """ + CheckJailBreaks is a class that implements the model security for checking jailbreaks. + """ + + alias_name = ["check_jailbreaks"] + supported_tasks = [ + "security", + "text-generation", + ] + + def transform(sample_list: List[Sample], *args, **kwargs): + """""" + for sample in sample_list: + sample.test_type = "check_jailbreaks" + sample.category = "security" + + return sample_list diff --git a/langtest/utils/custom_types/output.py b/langtest/utils/custom_types/output.py index 0808e92bd..da3c0d5f9 100644 --- a/langtest/utils/custom_types/output.py +++ b/langtest/utils/custom_types/output.py @@ -56,11 +56,11 @@ def to_str_list(self) -> float: def __repr__(self) -> str: """Printable representation""" - return f"{self.min_score}" + return f"{self.min_score:.3f}" def __str__(self) -> str: """String representation""" - return f"{self.min_score}" + return f"{self.min_score:.3f}" class MaxScoreOutput(BaseModel): @@ -74,11 +74,15 @@ def to_str_list(self) -> float: def __repr__(self) -> str: """Printable representation""" - return f"{self.max_score}" + return f"{self.max_score:.3f}" def __str__(self) -> str: """String representation""" - return f"{self.max_score}" + return f"{self.max_score:.3f}" + + def __ge__(self, other: "MaxScoreOutput") -> bool: + """Greater than comparison method.""" + return self.max_score >= other.max_score class NEROutput(BaseModel): diff --git a/langtest/utils/custom_types/sample.py b/langtest/utils/custom_types/sample.py index 8477fb9bb..33da057ac 100644 --- a/langtest/utils/custom_types/sample.py +++ b/langtest/utils/custom_types/sample.py @@ -8,7 +8,7 @@ from .helpers import Transformation, Span from .helpers import default_user_prompt from ...metrics import EmbeddingDistance -from .output import NEROutput, Result +from .output import MaxScoreOutput, NEROutput, Result from .predictions import NERPrediction @@ -488,6 +488,32 @@ def run(self, model, **kwargs): ) return tokens + def get_prompt(self): + """Returns the prompt for the sample""" + from .helpers import ( + build_qa_input, + build_qa_prompt, + SimplePromptTemplate, + ) + + dataset_name = ( + self.dataset_name.split("-")[0].lower() + if self.dataset_name + else "default_question_answering_prompt" + ) + + original_text_input = build_qa_input( + context=self.original_context, + question=self.original_question, + options=self.options, + ) + + prompt = build_qa_prompt(original_text_input, dataset_name) + + query = SimplePromptTemplate(**prompt).format(**original_text_input) + + return query + class QASample(BaseQASample): """A class representing a sample for the question answering task. @@ -592,6 +618,9 @@ def is_pass(self) -> bool: if self.ran_pass is not None: return self.ran_pass + elif isinstance(self.expected_results, MaxScoreOutput): + self.ran_pass = self.expected_results >= self.actual_results + return self.ran_pass else: self.__update_params() try: