Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions langtest/modelhandler/promptguard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
class PromptGuard:
_instance = None

def __new__(cls, model_name: str = "meta-llama/Prompt-Guard-86M", device="cpu"):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.model_name = model_name
cls._instance.device = device
(
cls._instance.model,
cls._instance.tokenizer,
) = cls._instance._load_model_and_tokenizer()
return cls._instance

def __init__(
self, model_name: str = "meta-llama/Prompt-Guard-86M", device="cpu"
) -> None:
self.model_name = "meta-llama/Prompt-Guard-86M"
self.device = "cpu"
self.model, self.tokenizer = self._load_model_and_tokenizer()

def _load_model_and_tokenizer(self):
"""
Load the model and tokenizer from Hugging Face.
"""
from transformers import AutoModelForSequenceClassification, AutoTokenizer

model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to(
self.device
)
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
return model, tokenizer

def _preprocess_text(self, text):
"""
Preprocess the input text by removing spaces to mitigate prompt injection tactics.
"""
cleaned_text = "".join([char for char in text if not char.isspace()])
tokens = self.tokenizer.tokenize(cleaned_text)
result = " ".join(
[self.tokenizer.convert_tokens_to_string([token]) for token in tokens]
)
return result or text

def _get_class_probabilities(self, texts, temperature=1.0, preprocess=True):
"""
Internal method to get class probabilities for a single or batch of texts.
"""
import torch
from torch.nn.functional import softmax

if preprocess:
texts = [self._preprocess_text(text) for text in texts]

inputs = self.tokenizer(
texts, return_tensors="pt", padding=True, truncation=True, max_length=512
)
inputs = inputs.to(self.device)

with torch.no_grad():
logits = self.model(**inputs).logits

probabilities = softmax(logits / temperature, dim=-1)
return probabilities

def get_jailbreak_score(self, text, temperature=1.0, preprocess=True):
"""
Get jailbreak score for a single input text.
"""
probabilities = self._get_class_probabilities([text], temperature, preprocess)
return probabilities[0, 2].item()

def get_indirect_injection_score(self, text, temperature=1.0, preprocess=True):
"""
Get indirect injection score for a single input text.
"""
probabilities = self._get_class_probabilities([text], temperature, preprocess)
return (probabilities[0, 1] + probabilities[0, 2]).item()

def _process_text_batch(
self, texts, score_indices, temperature=1.0, max_batch_size=16, preprocess=True
):
"""
Internal method to process texts in batches and return scores.
"""
import torch

num_texts = len(texts)
all_scores = torch.zeros(num_texts)

for i in range(0, num_texts, max_batch_size):
batch_texts = texts[i : i + max_batch_size]
probabilities = self._get_class_probabilities(
batch_texts, temperature, preprocess
)
batch_scores = probabilities[:, score_indices].sum(dim=1).cpu()

all_scores[i : i + max_batch_size] = batch_scores

return all_scores.tolist()

def get_jailbreak_scores_for_texts(
self, texts, temperature=1.0, max_batch_size=16, preprocess=True
):
"""
Get jailbreak scores for a batch of texts.
"""
return self._process_text_batch(
texts,
score_indices=[2],
temperature=temperature,
max_batch_size=max_batch_size,
preprocess=preprocess,
)

def get_indirect_injection_scores_for_texts(
self, texts, temperature=1.0, max_batch_size=16, preprocess=True
):
"""
Get indirect injection scores for a batch of texts.
"""
return self._process_text_batch(
texts,
score_indices=[1, 2],
temperature=temperature,
max_batch_size=max_batch_size,
preprocess=preprocess,
)
6 changes: 3 additions & 3 deletions langtest/transform/accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import defaultdict
import pandas as pd
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from typing import Any, DefaultDict, Dict, List, Type

from langtest.modelhandler.modelhandler import ModelAPI
from langtest.transform.base import ITests
Expand Down Expand Up @@ -103,7 +103,7 @@ def transform(self) -> List[Sample]:
return all_samples

@staticmethod
def available_tests() -> dict:
def available_tests() -> DefaultDict[str, Type["BaseAccuracy"]]:
"""
Get a dictionary of all available tests, with their names as keys and their corresponding classes as values.

Expand Down Expand Up @@ -265,7 +265,7 @@ class BaseAccuracy(ABC):
transform(data: List[Sample]) -> Any: Transforms the input data into an output based on the implemented accuracy measure.
"""

test_types = defaultdict(lambda: BaseAccuracy)
test_types: DefaultDict[str, Type["BaseAccuracy"]] = defaultdict(lambda: BaseAccuracy)

alias_name = None
supported_tasks = ["ner", "text-classification"]
Expand Down
104 changes: 104 additions & 0 deletions langtest/transform/safety.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import asyncio
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Dict, List

from ..datahandler.datasource import DataFactory
from langtest.errors import Errors
from langtest.modelhandler.modelhandler import ModelAPI
from langtest.tasks.task import TaskManager
from langtest.transform.base import ITests
from langtest.utils.custom_types.output import MaxScoreOutput
from langtest.utils.custom_types import sample as samples
from langtest.utils.custom_types.sample import Sample


Expand Down Expand Up @@ -171,3 +174,104 @@ def transform(self, count: int = 50, *args, **kwargs) -> List[Sample]:
)}"""

return data


class InjectionProbalities(BaseSafetyTest):
alias_name = "injection_probalities_score"
supported_tasks = ["question-answering"]
""" Injection Probabilities Score test.
"""

def transform(self, max_score: float, *args, **kwargs) -> List[Sample]:
"""Execute the Injection Probabilities Score test and return resulting `Sample` objects."""

data = []
for sample in self._data_handler:
sample = deepcopy(sample)
sample.category = "safety"
sample.test_type = "injection_probalities_score"
sample.expected_results = MaxScoreOutput(max_score=max_score)
data.append(sample)

return data

@classmethod
async def run(self, sample_list: List[Sample], *args, **kwargs) -> List[Sample]:
"""Execute the Injection Probabilities Score test and return resulting `Sample` objects."""

from langtest.modelhandler.promptguard import PromptGuard

prompt_guard = PromptGuard()

output = []

# progress bar
progress = kwargs.get("progress_bar", False)

for sample in sample_list:
if isinstance(sample, samples.QASample):
text = sample.get_prompt()
elif isinstance(sample, samples.NERSample):
text = sample + sample.original

result = prompt_guard.get_indirect_injection_score(text)

sample.actual_results = MaxScoreOutput(max_score=float(result))
sample.state = "done"
output.append(sample)

if progress:
progress.update(1)
return output


class JailBreakProbalities(BaseSafetyTest):
alias_name = "jailbreak_probalities_score"
supported_tasks = ["question-answering"]
""" Jailbreak Probabilities test.
"""

def transform(self, max_score: float, *args, **kwargs) -> List[Sample]:
"""Execute the Jailbreak Probabilities test and return resulting `Sample` objects."""

data = []
for sample in self._data_handler:
sample = deepcopy(sample)
sample.category = "safety"
sample.test_type = "jailbreak_probalities_score"
sample.expected_results = MaxScoreOutput(max_score=max_score)
data.append(sample)

return data

@classmethod
async def run(
self, sample_list: List[Sample], model: ModelAPI, *args, **kwargs
) -> List[Sample]:
"""Execute the Jailbreak Probabilities test and return resulting `Sample` objects."""

from langtest.modelhandler.promptguard import PromptGuard

prompt_guard = PromptGuard()

output = []

# progress bar
progress = kwargs.get("progress_bar", False)

for sample in sample_list:
if isinstance(sample, samples.BaseQASample):
text = sample.get_prompt()
elif isinstance(sample, samples.BaseSample):
text = sample.original

result = prompt_guard.get_jailbreak_score(text)

sample.actual_results = MaxScoreOutput(max_score=float(result))
sample.state = "done"

output.append(sample)

if progress:
progress.update(1)
return output
40 changes: 40 additions & 0 deletions langtest/transform/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,43 @@ def transform(sample_list: List[Sample], *args, **kwargs):
sample.category = "security"

return sample_list


class CheckPromptInjection(BaseSecurity):
"""
CheckPromptInjection is a class that implements the model security for checking prompt injection.
"""

alias_name = ["check_prompt_injection_attack"]
supported_tasks = [
"security",
"text-generation",
]

def transform(sample_list: List[Sample], *args, **kwargs):
""""""
for sample in sample_list:
sample.test_type = "check_prompt_injection"
sample.category = "security"

return sample_list


class CheckJailBreaks(BaseSecurity):
"""
CheckJailBreaks is a class that implements the model security for checking jailbreaks.
"""

alias_name = ["check_jailbreaks"]
supported_tasks = [
"security",
"text-generation",
]

def transform(sample_list: List[Sample], *args, **kwargs):
""""""
for sample in sample_list:
sample.test_type = "check_jailbreaks"
sample.category = "security"

return sample_list
12 changes: 8 additions & 4 deletions langtest/utils/custom_types/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ def to_str_list(self) -> float:

def __repr__(self) -> str:
"""Printable representation"""
return f"{self.min_score}"
return f"{self.min_score:.3f}"

def __str__(self) -> str:
"""String representation"""
return f"{self.min_score}"
return f"{self.min_score:.3f}"


class MaxScoreOutput(BaseModel):
Expand All @@ -74,11 +74,15 @@ def to_str_list(self) -> float:

def __repr__(self) -> str:
"""Printable representation"""
return f"{self.max_score}"
return f"{self.max_score:.3f}"

def __str__(self) -> str:
"""String representation"""
return f"{self.max_score}"
return f"{self.max_score:.3f}"

def __ge__(self, other: "MaxScoreOutput") -> bool:
"""Greater than comparison method."""
return self.max_score >= other.max_score


class NEROutput(BaseModel):
Expand Down
Loading