From 10aa4b3220d5b45c321cf0b0bb87ebf2282e5875 Mon Sep 17 00:00:00 2001 From: Kalyan Chakravarthy Date: Tue, 17 Sep 2024 17:13:52 +0530 Subject: [PATCH 1/5] Refactor security.py to add new security checks --- langtest/transform/security.py | 40 ++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/langtest/transform/security.py b/langtest/transform/security.py index dfcf78a1e..7138f271a 100644 --- a/langtest/transform/security.py +++ b/langtest/transform/security.py @@ -120,3 +120,43 @@ def transform(sample_list: List[Sample], *args, **kwargs): sample.category = "security" return sample_list + + +class CheckPromptInjection(BaseSecurity): + """ + CheckPromptInjection is a class that implements the model security for checking prompt injection. + """ + + alias_name = ["check_prompt_injection_attack"] + supported_tasks = [ + "security", + "text-generation", + ] + + def transform(sample_list: List[Sample], *args, **kwargs): + """""" + for sample in sample_list: + sample.test_type = "check_prompt_injection" + sample.category = "security" + + return sample_list + + +class CheckJailBreaks(BaseSecurity): + """ + CheckJailBreaks is a class that implements the model security for checking jailbreaks. + """ + + alias_name = ["check_jailbreaks"] + supported_tasks = [ + "security", + "text-generation", + ] + + def transform(sample_list: List[Sample], *args, **kwargs): + """""" + for sample in sample_list: + sample.test_type = "check_jailbreaks" + sample.category = "security" + + return sample_list \ No newline at end of file From 62b77b1246e0048c450ff8f2519612d0d46f358c Mon Sep 17 00:00:00 2001 From: Kalyan Chakravarthy Date: Wed, 18 Sep 2024 21:57:25 +0530 Subject: [PATCH 2/5] Refactor typing imports in accuracy.py and safety.py --- langtest/transform/accuracy.py | 6 +- langtest/transform/safety.py | 116 ++++++++++++++++++++++++++ langtest/transform/security.py | 2 +- langtest/utils/custom_types/sample.py | 26 ++++++ 4 files changed, 146 insertions(+), 4 deletions(-) diff --git a/langtest/transform/accuracy.py b/langtest/transform/accuracy.py index c9f4ccfc5..1125a5e72 100644 --- a/langtest/transform/accuracy.py +++ b/langtest/transform/accuracy.py @@ -2,7 +2,7 @@ from collections import defaultdict import pandas as pd from abc import ABC, abstractmethod -from typing import Any, Dict, List +from typing import Any, DefaultDict, Dict, List, Type from langtest.modelhandler.modelhandler import ModelAPI from langtest.transform.base import ITests @@ -103,7 +103,7 @@ def transform(self) -> List[Sample]: return all_samples @staticmethod - def available_tests() -> dict: + def available_tests() -> DefaultDict[str, Type["BaseAccuracy"]]: """ Get a dictionary of all available tests, with their names as keys and their corresponding classes as values. @@ -265,7 +265,7 @@ class BaseAccuracy(ABC): transform(data: List[Sample]) -> Any: Transforms the input data into an output based on the implemented accuracy measure. """ - test_types = defaultdict(lambda: BaseAccuracy) + test_types: DefaultDict[str, Type["BaseAccuracy"]] = defaultdict(lambda: BaseAccuracy) alias_name = None supported_tasks = ["ner", "text-classification"] diff --git a/langtest/transform/safety.py b/langtest/transform/safety.py index dbd7ca7d8..0055df99e 100644 --- a/langtest/transform/safety.py +++ b/langtest/transform/safety.py @@ -1,5 +1,6 @@ import asyncio from abc import ABC, abstractmethod +from copy import deepcopy from typing import Dict, List from ..datahandler.datasource import DataFactory @@ -7,6 +8,8 @@ from langtest.modelhandler.modelhandler import ModelAPI from langtest.tasks.task import TaskManager from langtest.transform.base import ITests +from langtest.utils.custom_types.output import MaxScoreOutput +from langtest.utils.custom_types import sample as samples from langtest.utils.custom_types.sample import Sample @@ -171,3 +174,116 @@ def transform(self, count: int = 50, *args, **kwargs) -> List[Sample]: )}""" return data + + +class InjectionProbalities(BaseSafetyTest): + alias_name = "injection_probalities_score" + supported_tasks = ["question-answering"] + """ Injection Probabilities Score test. + """ + + def transform(self, max_score: float, *args, **kwargs) -> List[Sample]: + """Execute the Injection Probabilities Score test and return resulting `Sample` objects.""" + + data = [] + for sample in self._data_handler: + sample = deepcopy(sample) + sample.category = "safety" + sample.test_type = "injection_probalities_score" + sample.expected_results = MaxScoreOutput(max_score=max_score) + data.append(sample) + + return data + + @classmethod + async def run(self, sample_list: List[Sample], *args, **kwargs) -> List[Sample]: + """Execute the Injection Probabilities Score test and return resulting `Sample` objects.""" + + # intialize the model + from transformers import pipeline + + pipe = pipeline("text-classification", model="meta-llama/Prompt-Guard-86M") + + output = [] + + # progress bar + progress = kwargs.get("progress_bar", False) + + for sample in sample_list: + if isinstance(sample, samples.QASample): + text = sample.get_prompt() + elif isinstance(sample, samples.NERSample): + text = sample + sample.original + + result = pipe(text) + score = 0.0 + if result[0]["label"] == "BENIGN": + score = 0.0 + elif result[0]["label"] == "INJECTION": + score = result[0]["score"] + + sample.actual_results = MaxScoreOutput(max_score=float(score)) + sample.state = "done" + output.append(sample) + + if progress: + progress.update(1) + return output + + +class JailBreakProbalities(BaseSafetyTest): + alias_name = "jailbreak_probalities_score" + supported_tasks = ["question-answering"] + """ Jailbreak Probabilities test. + """ + + def transform(self, max_score: float, *args, **kwargs) -> List[Sample]: + """Execute the Jailbreak Probabilities test and return resulting `Sample` objects.""" + + data = [] + for sample in self._data_handler: + sample = deepcopy(sample) + sample.category = "safety" + sample.test_type = "injection_probalities_score" + sample.expected_results = MaxScoreOutput(max_score=max_score) + data.append(sample) + + return data + + @classmethod + async def run( + self, sample_list: List[Sample], model: ModelAPI, *args, **kwargs + ) -> List[Sample]: + """Execute the Jailbreak Probabilities test and return resulting `Sample` objects.""" + + # intialize the model + from transformers import pipeline + + pipe = pipeline("text-classification", model="meta-llama/Prompt-Guard-86M") + + output = [] + + # progress bar + progress = kwargs.get("progress_bar", False) + + for sample in sample_list: + if isinstance(sample, samples.QASample): + text = sample.get_prompt() + elif isinstance(sample, samples.NERSample): + text = sample + sample.original + + result = pipe(text) + score = 0.0 + if result[0]["label"] == "BENIGN": + score = 0.0 + elif result[0]["label"] == "INJECTION": + score = result[0]["score"] + + sample.actual_results = MaxScoreOutput(max_score=float(score)) + sample.state = "done" + + output.append(sample) + + if progress: + progress.update(1) + return output diff --git a/langtest/transform/security.py b/langtest/transform/security.py index 7138f271a..e3444ddc1 100644 --- a/langtest/transform/security.py +++ b/langtest/transform/security.py @@ -159,4 +159,4 @@ def transform(sample_list: List[Sample], *args, **kwargs): sample.test_type = "check_jailbreaks" sample.category = "security" - return sample_list \ No newline at end of file + return sample_list diff --git a/langtest/utils/custom_types/sample.py b/langtest/utils/custom_types/sample.py index 8477fb9bb..d6a319c91 100644 --- a/langtest/utils/custom_types/sample.py +++ b/langtest/utils/custom_types/sample.py @@ -488,6 +488,32 @@ def run(self, model, **kwargs): ) return tokens + def get_prompt(self): + """Returns the prompt for the sample""" + from .helpers import ( + build_qa_input, + build_qa_prompt, + SimplePromptTemplate, + ) + + dataset_name = ( + self.dataset_name.split("-")[0].lower() + if self.dataset_name + else "default_question_answering_prompt" + ) + + original_text_input = build_qa_input( + context=self.original_context, + question=self.original_question, + options=self.options, + ) + + prompt = build_qa_prompt(original_text_input, dataset_name) + + query = SimplePromptTemplate(**prompt).format(**original_text_input) + + return query + class QASample(BaseQASample): """A class representing a sample for the question answering task. From 7a58067ab74f23e4045cf9e2aa6b26bd8ed4fb0f Mon Sep 17 00:00:00 2001 From: Kalyan Chakravarthy Date: Wed, 18 Sep 2024 23:29:18 +0530 Subject: [PATCH 3/5] Refactor test type in safety.py and add decimal formatting in output.py --- langtest/transform/safety.py | 2 +- langtest/utils/custom_types/output.py | 8 ++++---- langtest/utils/custom_types/sample.py | 6 +++++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/langtest/transform/safety.py b/langtest/transform/safety.py index 0055df99e..cd50bad2d 100644 --- a/langtest/transform/safety.py +++ b/langtest/transform/safety.py @@ -244,7 +244,7 @@ def transform(self, max_score: float, *args, **kwargs) -> List[Sample]: for sample in self._data_handler: sample = deepcopy(sample) sample.category = "safety" - sample.test_type = "injection_probalities_score" + sample.test_type = "jailbreak_probalities_score" sample.expected_results = MaxScoreOutput(max_score=max_score) data.append(sample) diff --git a/langtest/utils/custom_types/output.py b/langtest/utils/custom_types/output.py index 0808e92bd..bd8771b82 100644 --- a/langtest/utils/custom_types/output.py +++ b/langtest/utils/custom_types/output.py @@ -56,11 +56,11 @@ def to_str_list(self) -> float: def __repr__(self) -> str: """Printable representation""" - return f"{self.min_score}" + return f"{self.min_score:.3f}" def __str__(self) -> str: """String representation""" - return f"{self.min_score}" + return f"{self.min_score:.3f}" class MaxScoreOutput(BaseModel): @@ -74,11 +74,11 @@ def to_str_list(self) -> float: def __repr__(self) -> str: """Printable representation""" - return f"{self.max_score}" + return f"{self.max_score:.3f}" def __str__(self) -> str: """String representation""" - return f"{self.max_score}" + return f"{self.max_score:.3f}" class NEROutput(BaseModel): diff --git a/langtest/utils/custom_types/sample.py b/langtest/utils/custom_types/sample.py index d6a319c91..e7e4f3785 100644 --- a/langtest/utils/custom_types/sample.py +++ b/langtest/utils/custom_types/sample.py @@ -8,7 +8,7 @@ from .helpers import Transformation, Span from .helpers import default_user_prompt from ...metrics import EmbeddingDistance -from .output import NEROutput, Result +from .output import MaxScoreOutput, NEROutput, Result from .predictions import NERPrediction @@ -618,6 +618,10 @@ def is_pass(self) -> bool: if self.ran_pass is not None: return self.ran_pass + elif isinstance(self.expected_results, MaxScoreOutput): + + self.ran_pass = self.expected_results == self.actual_results + return self.ran_pass else: self.__update_params() try: From e9c54e9a12144b4eb840ca2eb200edb7fd0936db Mon Sep 17 00:00:00 2001 From: Kalyan Chakravarthy Date: Wed, 18 Sep 2024 23:55:54 +0530 Subject: [PATCH 4/5] fixed: formatted issue --- langtest/utils/custom_types/sample.py | 1 - 1 file changed, 1 deletion(-) diff --git a/langtest/utils/custom_types/sample.py b/langtest/utils/custom_types/sample.py index e7e4f3785..045e93417 100644 --- a/langtest/utils/custom_types/sample.py +++ b/langtest/utils/custom_types/sample.py @@ -619,7 +619,6 @@ def is_pass(self) -> bool: if self.ran_pass is not None: return self.ran_pass elif isinstance(self.expected_results, MaxScoreOutput): - self.ran_pass = self.expected_results == self.actual_results return self.ran_pass else: From a90c932fd04c47afde82fb6505ecb5714ea9c9b3 Mon Sep 17 00:00:00 2001 From: Kalyan Chakravarthy Date: Thu, 19 Sep 2024 13:08:24 +0530 Subject: [PATCH 5/5] Refactor PromptGuard class and related modules This commit refactors the PromptGuard class in the modelhandler/promptguard.py module. The changes include: - Simplifying the initialization process by using a singleton pattern - Loading the model and tokenizer from Hugging Face - Preprocessing the input text to remove spaces and mitigate prompt injection tactics - Calculating class probabilities for a single or batch of texts - Adding methods to get jailbreak scores and indirect injection scores for a single input text or a batch of texts - Processing texts in batches to improve efficiency The commit also includes changes in the safety.py module: - Importing the PromptGuard class from the modelhandler/promptguard.py module - Replacing the pipeline usage with the PromptGuard class to get indirect injection scores Lastly, the commit includes changes in the output.py and sample.py modules: - Adding a greater than or equal to comparison method in the MaxScoreOutput class - Updating the comparison method in the QASample class to use the new comparison method in MaxScoreOutput --- langtest/modelhandler/promptguard.py | 128 ++++++++++++++++++++++++++ langtest/transform/safety.py | 34 +++---- langtest/utils/custom_types/output.py | 4 + langtest/utils/custom_types/sample.py | 2 +- 4 files changed, 144 insertions(+), 24 deletions(-) create mode 100644 langtest/modelhandler/promptguard.py diff --git a/langtest/modelhandler/promptguard.py b/langtest/modelhandler/promptguard.py new file mode 100644 index 000000000..93d417f1d --- /dev/null +++ b/langtest/modelhandler/promptguard.py @@ -0,0 +1,128 @@ +class PromptGuard: + _instance = None + + def __new__(cls, model_name: str = "meta-llama/Prompt-Guard-86M", device="cpu"): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.model_name = model_name + cls._instance.device = device + ( + cls._instance.model, + cls._instance.tokenizer, + ) = cls._instance._load_model_and_tokenizer() + return cls._instance + + def __init__( + self, model_name: str = "meta-llama/Prompt-Guard-86M", device="cpu" + ) -> None: + self.model_name = "meta-llama/Prompt-Guard-86M" + self.device = "cpu" + self.model, self.tokenizer = self._load_model_and_tokenizer() + + def _load_model_and_tokenizer(self): + """ + Load the model and tokenizer from Hugging Face. + """ + from transformers import AutoModelForSequenceClassification, AutoTokenizer + + model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to( + self.device + ) + tokenizer = AutoTokenizer.from_pretrained(self.model_name) + return model, tokenizer + + def _preprocess_text(self, text): + """ + Preprocess the input text by removing spaces to mitigate prompt injection tactics. + """ + cleaned_text = "".join([char for char in text if not char.isspace()]) + tokens = self.tokenizer.tokenize(cleaned_text) + result = " ".join( + [self.tokenizer.convert_tokens_to_string([token]) for token in tokens] + ) + return result or text + + def _get_class_probabilities(self, texts, temperature=1.0, preprocess=True): + """ + Internal method to get class probabilities for a single or batch of texts. + """ + import torch + from torch.nn.functional import softmax + + if preprocess: + texts = [self._preprocess_text(text) for text in texts] + + inputs = self.tokenizer( + texts, return_tensors="pt", padding=True, truncation=True, max_length=512 + ) + inputs = inputs.to(self.device) + + with torch.no_grad(): + logits = self.model(**inputs).logits + + probabilities = softmax(logits / temperature, dim=-1) + return probabilities + + def get_jailbreak_score(self, text, temperature=1.0, preprocess=True): + """ + Get jailbreak score for a single input text. + """ + probabilities = self._get_class_probabilities([text], temperature, preprocess) + return probabilities[0, 2].item() + + def get_indirect_injection_score(self, text, temperature=1.0, preprocess=True): + """ + Get indirect injection score for a single input text. + """ + probabilities = self._get_class_probabilities([text], temperature, preprocess) + return (probabilities[0, 1] + probabilities[0, 2]).item() + + def _process_text_batch( + self, texts, score_indices, temperature=1.0, max_batch_size=16, preprocess=True + ): + """ + Internal method to process texts in batches and return scores. + """ + import torch + + num_texts = len(texts) + all_scores = torch.zeros(num_texts) + + for i in range(0, num_texts, max_batch_size): + batch_texts = texts[i : i + max_batch_size] + probabilities = self._get_class_probabilities( + batch_texts, temperature, preprocess + ) + batch_scores = probabilities[:, score_indices].sum(dim=1).cpu() + + all_scores[i : i + max_batch_size] = batch_scores + + return all_scores.tolist() + + def get_jailbreak_scores_for_texts( + self, texts, temperature=1.0, max_batch_size=16, preprocess=True + ): + """ + Get jailbreak scores for a batch of texts. + """ + return self._process_text_batch( + texts, + score_indices=[2], + temperature=temperature, + max_batch_size=max_batch_size, + preprocess=preprocess, + ) + + def get_indirect_injection_scores_for_texts( + self, texts, temperature=1.0, max_batch_size=16, preprocess=True + ): + """ + Get indirect injection scores for a batch of texts. + """ + return self._process_text_batch( + texts, + score_indices=[1, 2], + temperature=temperature, + max_batch_size=max_batch_size, + preprocess=preprocess, + ) diff --git a/langtest/transform/safety.py b/langtest/transform/safety.py index cd50bad2d..5eb028b66 100644 --- a/langtest/transform/safety.py +++ b/langtest/transform/safety.py @@ -199,10 +199,9 @@ def transform(self, max_score: float, *args, **kwargs) -> List[Sample]: async def run(self, sample_list: List[Sample], *args, **kwargs) -> List[Sample]: """Execute the Injection Probabilities Score test and return resulting `Sample` objects.""" - # intialize the model - from transformers import pipeline + from langtest.modelhandler.promptguard import PromptGuard - pipe = pipeline("text-classification", model="meta-llama/Prompt-Guard-86M") + prompt_guard = PromptGuard() output = [] @@ -215,14 +214,9 @@ async def run(self, sample_list: List[Sample], *args, **kwargs) -> List[Sample]: elif isinstance(sample, samples.NERSample): text = sample + sample.original - result = pipe(text) - score = 0.0 - if result[0]["label"] == "BENIGN": - score = 0.0 - elif result[0]["label"] == "INJECTION": - score = result[0]["score"] + result = prompt_guard.get_indirect_injection_score(text) - sample.actual_results = MaxScoreOutput(max_score=float(score)) + sample.actual_results = MaxScoreOutput(max_score=float(result)) sample.state = "done" output.append(sample) @@ -256,10 +250,9 @@ async def run( ) -> List[Sample]: """Execute the Jailbreak Probabilities test and return resulting `Sample` objects.""" - # intialize the model - from transformers import pipeline + from langtest.modelhandler.promptguard import PromptGuard - pipe = pipeline("text-classification", model="meta-llama/Prompt-Guard-86M") + prompt_guard = PromptGuard() output = [] @@ -267,19 +260,14 @@ async def run( progress = kwargs.get("progress_bar", False) for sample in sample_list: - if isinstance(sample, samples.QASample): + if isinstance(sample, samples.BaseQASample): text = sample.get_prompt() - elif isinstance(sample, samples.NERSample): - text = sample + sample.original + elif isinstance(sample, samples.BaseSample): + text = sample.original - result = pipe(text) - score = 0.0 - if result[0]["label"] == "BENIGN": - score = 0.0 - elif result[0]["label"] == "INJECTION": - score = result[0]["score"] + result = prompt_guard.get_jailbreak_score(text) - sample.actual_results = MaxScoreOutput(max_score=float(score)) + sample.actual_results = MaxScoreOutput(max_score=float(result)) sample.state = "done" output.append(sample) diff --git a/langtest/utils/custom_types/output.py b/langtest/utils/custom_types/output.py index bd8771b82..da3c0d5f9 100644 --- a/langtest/utils/custom_types/output.py +++ b/langtest/utils/custom_types/output.py @@ -80,6 +80,10 @@ def __str__(self) -> str: """String representation""" return f"{self.max_score:.3f}" + def __ge__(self, other: "MaxScoreOutput") -> bool: + """Greater than comparison method.""" + return self.max_score >= other.max_score + class NEROutput(BaseModel): """Output model for NER tasks.""" diff --git a/langtest/utils/custom_types/sample.py b/langtest/utils/custom_types/sample.py index 045e93417..33da057ac 100644 --- a/langtest/utils/custom_types/sample.py +++ b/langtest/utils/custom_types/sample.py @@ -619,7 +619,7 @@ def is_pass(self) -> bool: if self.ran_pass is not None: return self.ran_pass elif isinstance(self.expected_results, MaxScoreOutput): - self.ran_pass = self.expected_results == self.actual_results + self.ran_pass = self.expected_results >= self.actual_results return self.ran_pass else: self.__update_params()