Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init commit on approach to handlle pyrit strategies #1

Open
wants to merge 1 commit into
base: add-pyrit-dependency
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@
from azure.ai.evaluation._common.utils import validate_azure_ai_project
from azure.ai.evaluation._model_configurations import AzureOpenAIModelConfiguration, OpenAIModelConfiguration
from azure.ai.evaluation._safety_evaluation._callback_chat_target import CallbackChatTarget
from azure.ai.evaluation._safety_evaluation.pyrit_strategy_config import (
get_strategies_for_budget,
create_orchestrator,
StrategyType,
BudgetLevel
)
from azure.core.credentials import TokenCredential
import json
from pathlib import Path
import re
import itertools
from pyrit.common import initialize_pyrit, DUCK_DB
from pyrit.orchestrator import PromptSendingOrchestrator
from pyrit.orchestrator.single_turn.prompt_sending_orchestrator import PromptSendingOrchestrator
from pyrit.prompt_target import OpenAIChatTarget
from pyrit.models import ChatMessage

Expand Down Expand Up @@ -473,6 +479,9 @@ def _validate_inputs(
num_turns: int = 1,
scenario: Optional[Union[AdversarialScenario, AdversarialScenarioJailbreak]] = None,
source_text: Optional[str] = None,
attack_budget: Optional[List[str]] = None,
pyrit_strategies: Optional[List[Any]] = None,
pyrit_convertors: Optional[List[Any]] = None,
):
'''
Validates the inputs provided to the __call__ function of the SafetyEvaluation object.
Expand All @@ -486,6 +495,12 @@ def _validate_inputs(
:type scenario: Optional[Union[AdversarialScenario, AdversarialScenarioJailbreak]]
:param source_text: The source text to use as grounding document in the evaluation.
:type source_text: Optional[str]
:param attack_budget: A list of budget levels for attacks (e.g., ['low', 'medium', 'high']).
:type attack_budget: Optional[List[str]]
:param pyrit_strategies: A list of PyRIT orchestrator strategies to use for the evaluation.
:type pyrit_strategies: Optional[List[Any]]
:param pyrit_convertors: A list of PyRIT convertors to use for the evaluation.
:type pyrit_convertors: Optional[List[Any]]
'''
if not callable(target):
self._validate_model_config(target)
Expand Down Expand Up @@ -550,6 +565,32 @@ def _validate_inputs(
blame=ErrorBlame.USER_ERROR,
)

# Validate attack_budget and pyrit_strategies
if attack_budget and pyrit_strategies:
self.logger.error("Cannot specify both attack_budget and pyrit_strategies simultaneously")
msg = "Cannot specify both attack_budget and pyrit_strategies simultaneously"
raise EvaluationException(
message=msg,
internal_message=msg,
target=ErrorTarget.UNKNOWN,
category=ErrorCategory.INVALID_VALUE,
blame=ErrorBlame.USER_ERROR,
)

# Validate attack_budget values
if attack_budget:
valid_budgets = ["low", "medium", "high"]
invalid_budgets = [budget for budget in attack_budget if budget not in valid_budgets]
if invalid_budgets:
self.logger.error(f"Invalid attack_budget values: {invalid_budgets}. Valid values are: {valid_budgets}")
msg = f"Invalid attack_budget values: {invalid_budgets}. Valid values are: {valid_budgets}"
raise EvaluationException(
message=msg,
internal_message=msg,
target=ErrorTarget.UNKNOWN,
category=ErrorCategory.INVALID_VALUE,
blame=ErrorBlame.USER_ERROR,
)

def _calculate_defect_rate(self, evaluation_result_dict) -> EvaluationResult:
hate_unfairness_defects = []
Expand Down Expand Up @@ -616,55 +657,73 @@ async def _pyrit(
target: Union[Callable, AzureOpenAIModelConfiguration, OpenAIModelConfiguration],
scenario: AdversarialScenario,
num_rows: int = 1,
attack_budget: Optional[List[str]] = None,
pyrit_strategies: Optional[List[Any]] = None,
pyrit_convertors: Optional[List[Any]] = None,
) -> str:
chat_target: OpenAIChatTarget = None
if not isinstance(target, Callable):
if "azure_deployment" in target and "azure_endpoint" in target: # Azure OpenAI
api_key = target.get("api_key", None)
if not api_key:
chat_target = OpenAIChatTarget(deployment_name=target["azure_deployment"], endpoint=target["azure_endpoint"], use_aad_auth=True)
else:
chat_target = OpenAIChatTarget(deployment_name=target["azure_deployment"], endpoint=target["azure_endpoint"], api_key=api_key)
else:
chat_target = OpenAIChatTarget(deployment=target["model"], endpoint=target.get("base_url", None), key=target["api_key"], is_azure_target=False)
else:
async def callback_target(
messages: List[Dict],
stream: bool = False,
session_state: Optional[str] = None,
context: Optional[Dict] = None
) -> dict:
messages_list = [self._message_to_dict(chat_message) for chat_message in messages] # type: ignore
latest_message = messages_list[-1]
application_input = latest_message["content"]
try:
response = target(query=application_input)
except Exception as e:
response = f"Something went wrong {e!s}"

## We format the response to follow the openAI chat protocol format
formatted_response = {
"content": response,
"role": "assistant",
"context":{},
}
## NOTE: In the future, instead of appending to messages we should just return `formatted_response`
messages_list.append(formatted_response) # type: ignore
return {"messages": messages_list, "stream": stream, "session_state": session_state, "context": {}}
"""
Execute a PyRIT attack using the specified parameters.

Args:
target: The target function or model configuration
scenario: The adversarial scenario
num_rows: Number of attack samples to generate
attack_budget: List of budget levels for attack strategies
pyrit_strategies: List of specific PyRIT orchestrator strategies to use
pyrit_convertors: List of specific PyRIT convertors to use


chat_target = CallbackChatTarget(callback=callback_target)
Returns:
Path to the output file with attack results
"""
# Setup chat target
print(type(target))
if type(target) == dict:
self.logger.info(f"Setting up chat target with model configuration: {target}")
chat_target = self._setup_chat_target(target)
elif type(target) == Callable:
self.logger.info(f"Setting up chat target with callback function: {target}")
chat_target = self._setup_callback_target(target)

# Get prompts for the scenario
all_prompts_list = await self._get_all_prompts(scenario, num_rows=num_rows)

orchestrator = PromptSendingOrchestrator(objective_target=chat_target)

# Configure orchestrator based on inputs
orchestrators = []
convertors = []

# If user provided budget levels, get predefined strategies
if attack_budget:
self.logger.info(f"Configuring PyRIT based on attack budget: {attack_budget}")
strategies_config = get_strategies_for_budget(attack_budget)
orchestrators = strategies_config["orchestrators"]
convertors = strategies_config["convertors"]
self.logger.info(f"Selected {len(orchestrators)} orchestrators and {len(convertors)} convertors based on budget")

# If user provided specific strategies or convertors, use those (override budget-based ones)
if pyrit_strategies:
self.logger.info(f"Using user-provided PyRIT strategies")
orchestrators = pyrit_strategies

if pyrit_convertors:
self.logger.info(f"Using user-provided PyRIT convertors")
convertors = pyrit_convertors

# Create orchestrator with configured components
self.logger.info(f"Creating orchestrator with {len(orchestrators)} strategies and {len(convertors)} convertors")
orchestrator = create_orchestrator(
orchestrators=orchestrators if orchestrators else None,
convertors=convertors if convertors else None,
target=chat_target
)

# Execute attack
await orchestrator.send_prompts_async(prompt_list=all_prompts_list)
memory = orchestrator.get_memory()

# Get conversations as a List[List[ChatMessage]]
conversations = [[item.to_chat_message() for item in group] for conv_id, group in itertools.groupby(memory, key=lambda x: x.conversation_id)]

#Convert to json lines
# Convert to json lines
json_lines = ""
for conversation in conversations: # each conversation is a List[ChatMessage]
json_lines += json.dumps({"conversation": {"messages": [self._message_to_dict(message) for message in conversation]}}) + "\n"
Expand All @@ -673,6 +732,55 @@ async def callback_target(
with Path(data_path).open("w") as f:
f.writelines(json_lines)
return data_path

def _setup_chat_target(self, target: Union[Callable, Dict[str, Any]]) -> Any:
"""Create a chat target from the provided model config."""
if "azure_deployment" in target and "azure_endpoint" in target: # Azure OpenAI
api_key = target.get("api_key", None)
if not api_key:
return OpenAIChatTarget(
deployment_name=target["azure_deployment"],
endpoint=target["azure_endpoint"],
use_aad_auth=True
)
else:
return OpenAIChatTarget(
deployment_name=target["azure_deployment"],
endpoint=target["azure_endpoint"],
api_key=api_key
)
else:
return OpenAIChatTarget(
deployment=target["model"],
endpoint=target.get("base_url", None),
key=target["api_key"],
is_azure_target=False
)

async def _setup_callback_target(self, target: Callable) -> Any:
async def callback_target(
messages: dict,
stream: bool = False,
session_state: Optional[str] = None,
context: Optional[Dict] = None
) -> dict:
messages_list = [self._message_to_dict(chat_message) for chat_message in messages]
latest_message = messages_list[-1]
application_input = latest_message["content"]
try:
response = target(query=application_input)
except Exception as e:
response = f"Something went wrong {e!s}"

formatted_response = {
"content": response,
"role": "assistant",
"context":{},
}
messages_list.append(formatted_response)
return {"messages": messages_list, "stream": stream, "session_state": session_state, "context": {}}

return CallbackChatTarget(callback=callback_target)

async def __call__(
self,
Expand All @@ -687,7 +795,10 @@ async def __call__(
source_text: Optional[str] = None,
data_path: Optional[Union[str, os.PathLike]] = None,
jailbreak_data_path: Optional[Union[str, os.PathLike]] = None,
output_path: Optional[Union[str, os.PathLike]] = None
output_path: Optional[Union[str, os.PathLike]] = None,
attack_budget: Optional[List[str]] = None,
pyrit_strategies: Optional[List[Any]] = None,
pyrit_convertors: Optional[List[Any]] = None
) -> Union[EvaluationResult, Dict[str, EvaluationResult]]:
'''
Evaluates the target function based on the provided parameters.
Expand Down Expand Up @@ -716,9 +827,15 @@ async def __call__(
:type jailbreak_data_path: Optional[Union[str, os.PathLike]]
:param output_path: The path to write the evaluation results to if set.
:type output_path: Optional[Union[str, os.PathLike]]
:param attack_budget: A list of budget levels for attacks (e.g., ['low', 'medium', 'high']). The operation will choose appropriate PyRIT orchestrators based on these budgets.
:type attack_budget: Optional[List[str]]
:param pyrit_strategies: A list of PyRIT orchestrator strategies to use for the evaluation (e.g., [Flip(), Crescendo()]).
:type pyrit_strategies: Optional[List[Any]]
:param pyrit_convertors: A list of PyRIT convertors to use for the evaluation (e.g., [Base64Convertor()]).
:type pyrit_convertors: Optional[List[Any]]
'''
## Log inputs
self.logger.info(f"User inputs: evaluators{evaluators}, evaluation_name={evaluation_name}, num_turns={num_turns}, num_rows={num_rows}, conversation_turns={conversation_turns}, tasks={tasks}, source_text={source_text}, data_path={data_path}, jailbreak_data_path={jailbreak_data_path}, output_path={output_path}")
self.logger.info(f"User inputs: evaluators{evaluators}, evaluation_name={evaluation_name}, num_turns={num_turns}, num_rows={num_rows}, conversation_turns={conversation_turns}, tasks={tasks}, source_text={source_text}, data_path={data_path}, jailbreak_data_path={jailbreak_data_path}, output_path={output_path}, attack_budget={attack_budget}, pyrit_strategies={pyrit_strategies}, pyrit_convertors={pyrit_convertors}")

## Validate arguments
self._validate_inputs(
Expand All @@ -727,15 +844,29 @@ async def __call__(
num_turns=num_turns,
scenario=scenario,
source_text=source_text,
attack_budget=attack_budget,
pyrit_strategies=pyrit_strategies,
pyrit_convertors=pyrit_convertors,
)

# Get scenario
adversarial_scenario = self._get_scenario(evaluators, num_turns=num_turns, scenario=scenario)
self.logger.info(f"Using scenario: {adversarial_scenario}")

# Apply attack strategies based on provided parameters
if attack_budget or pyrit_strategies or pyrit_convertors:
self.logger.info(f"Using attack parameters: budget={attack_budget}, strategies={pyrit_strategies}, convertors={pyrit_convertors}")

if isinstance(adversarial_scenario, AdversarialScenario) and num_turns==1:
self.logger.info(f"Running Pyrit with inputs target={target}, scenario={scenario}")
data_path = await self._pyrit(target, adversarial_scenario, num_rows=num_rows)
self.logger.info(f"Running Pyrit with inputs target={target}, scenario={adversarial_scenario}, attack_budget={attack_budget}, pyrit_strategies={pyrit_strategies}, pyrit_convertors={pyrit_convertors}")
data_path = await self._pyrit(
target=target,
scenario=adversarial_scenario,
num_rows=num_rows,
attack_budget=attack_budget,
pyrit_strategies=pyrit_strategies,
pyrit_convertors=pyrit_convertors,
)

## Get evaluators
evaluators_dict = self._get_evaluators(evaluators)
Expand Down
Loading