Skip to content

feat: add RailException support and improve error handling #1178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 90 additions & 40 deletions nemoguardrails/library/injection_detection/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import re
from functools import lru_cache
from pathlib import Path
from typing import Dict, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, TypedDict, Union

yara = None
try:
Expand All @@ -49,6 +49,12 @@
log = logging.getLogger(__name__)


class InjectionDetectionResult(TypedDict):
is_injection: bool
text: str
detections: List[str]


def _check_yara_available():
if yara is None:
raise ImportError(
Expand Down Expand Up @@ -197,13 +203,13 @@ def _load_rules(
}
rules = yara.compile(filepaths=rules_to_load)
except yara.SyntaxError as e:
msg = f"Encountered SyntaxError: {e}"
msg = f"Failed to initialize injection detection due to configuration or YARA rule error: YARA compilation failed: {e}"
log.error(msg)
raise e
return None
return rules


def _omit_injection(text: str, matches: list["yara.Match"]) -> str:
def _omit_injection(text: str, matches: list["yara.Match"]) -> Tuple[bool, str]:
"""
Attempts to strip the offending injection attempts from the provided text.

Expand All @@ -216,14 +222,18 @@ def _omit_injection(text: str, matches: list["yara.Match"]) -> str:
matches (list['yara.Match']): A list of YARA rule matches.

Returns:
str: The text with the detected injections stripped out.
Tuple[bool, str]: A tuple containing:
- bool: True if injection was detected and modified,
False if the text is safe (i.e., not modified).
- str: The text, with detected injections stripped out if modified.

Raises:
ImportError: If the yara module is not installed.
"""

# Copy the text to a placeholder variable
original_text = text
modified_text = text
is_injection = False
for match in matches:
if match.strings:
for match_string in match.strings:
Expand All @@ -234,10 +244,16 @@ def _omit_injection(text: str, matches: list["yara.Match"]) -> str:
modified_text = modified_text.replace(plaintext, "")
except (AttributeError, UnicodeDecodeError) as e:
log.warning(f"Error processing match: {e}")
return modified_text

if modified_text != original_text:
is_injection = True
return is_injection, modified_text
else:
is_injection = False
return is_injection, original_text


def _sanitize_injection(text: str, matches: list["yara.Match"]) -> str:
def _sanitize_injection(text: str, matches: list["yara.Match"]) -> Tuple[bool, str]:
"""
Attempts to sanitize the offending injection attempts in the provided text.
This is done by 'de-fanging' the offending content, transforming it into a state that will not execute
Expand All @@ -253,19 +269,27 @@ def _sanitize_injection(text: str, matches: list["yara.Match"]) -> str:
matches (list['yara.Match']): A list of YARA rule matches.

Returns:
str: The text with the detected injections sanitized.
Tuple[bool, str]: A tuple containing:
- bool: True if injection was detected, False otherwise.
- str: The sanitized text, or original text depending on sanitization outcome.
Currently, this function will always raise NotImplementedError.

Raises:
NotImplementedError: If the sanitization logic is not implemented.
ImportError: If the yara module is not installed.
"""

raise NotImplementedError(
"Injection sanitization is not yet implemented. Please use 'reject' or 'omit'"
)
# Hypothetical logic if implemented, to match existing behavior in injection_detection:
# sanitized_text_attempt = "..." # result of sanitization
# if sanitized_text_attempt != text:
# return True, text # Original text returned, marked as injection detected
# else:
# return False, sanitized_text_attempt


def _reject_injection(text: str, rules: "yara.Rules") -> Tuple[bool, str]:
def _reject_injection(text: str, rules: "yara.Rules") -> Tuple[bool, List[str]]:
"""
Detects whether the provided text contains potential injection attempts.

Expand All @@ -277,8 +301,9 @@ def _reject_injection(text: str, rules: "yara.Rules") -> Tuple[bool, str]:
rules ('yara.Rules'): The loaded YARA rules.

Returns:
bool: True if attempted exploitation is detected, False otherwise.
str: list of matches as a string
Tuple[bool, List[str]]: A tuple containing:
- bool: True if attempted exploitation is detected, False otherwise.
- List[str]: List of matched rule names.

Raises:
ValueError: If the `action` parameter in the configuration is invalid.
Expand All @@ -289,18 +314,20 @@ def _reject_injection(text: str, rules: "yara.Rules") -> Tuple[bool, str]:
log.warning(
"reject_injection guardrail was invoked but no rules were specified in the InjectionDetection config."
)
return False, ""
return False, []
matches = rules.match(data=text)
if matches:
matches_string = ", ".join([match_name.rule for match_name in matches])
log.info(f"Input matched on rule {matches_string}.")
return True, matches_string
matched_rules = [match_name.rule for match_name in matches]
log.info(f"Input matched on rule {', '.join(matched_rules)}.")
return True, matched_rules
else:
return False, ""
return False, []


@action()
async def injection_detection(text: str, config: RailsConfig) -> str:
async def injection_detection(
text: str, config: RailsConfig
) -> InjectionDetectionResult:
"""
Detects and mitigates potential injection attempts in the provided text.

Expand All @@ -310,45 +337,68 @@ async def injection_detection(text: str, config: RailsConfig) -> str:

Args:
text (str): The text to check for command injection.

config (RailsConfig): The Rails configuration object containing injection detection settings.

Returns:
str: The sanitized or original text, depending on the action specified in the configuration.
InjectionDetectionResult: A TypedDict containing:
- is_injection (bool): Whether an injection was detected. True if any injection is detected,
False if no injection is detected.
- text (str): The sanitized or original text
- detections (List[str]): List of matched rule names if any injection is detected

Raises:
ValueError: If the `action` parameter in the configuration is invalid.
NotImplementedError: If an unsupported action is encountered.
ImportError: If the yara module is not installed.
"""
_check_yara_available()

_validate_injection_config(config)

action_option, yara_path, rule_names, yara_rules = _extract_injection_config(config)

rules = _load_rules(yara_path, rule_names, yara_rules)

if action_option == "reject":
verdict, detections = _reject_injection(text, rules)
if verdict:
return f"I'm sorry, the desired output triggered rule(s) designed to mitigate exploitation of {detections}."
else:
return text
if rules is None:
log.warning(
"injection detection guardrail was invoked but no rules were specified in the InjectionDetection config."
)
return text
matches = rules.match(data=text)
if matches:
matches_string = ", ".join([match_name.rule for match_name in matches])
log.info(f"Input matched on rule {matches_string}.")
if action_option == "omit":
return _omit_injection(text, matches)
elif action_option == "sanitize":
return _sanitize_injection(text, matches)
return InjectionDetectionResult(is_injection=False, text=text, detections=[])

if action_option == "reject":
is_injection, detected_rules = _reject_injection(text, rules)
return InjectionDetectionResult(
is_injection=is_injection, text=text, detections=detected_rules
)
else:
matches = rules.match(data=text)
if matches:
detected_rules_list = [match_name.rule for match_name in matches]
log.info(f"Input matched on rule {', '.join(detected_rules_list)}.")

if action_option == "omit":
is_injection, result_text = _omit_injection(text, matches)
return InjectionDetectionResult(
is_injection=is_injection,
text=result_text,
detections=detected_rules_list,
)
elif action_option == "sanitize":
# _sanitize_injection will raise NotImplementedError before returning a tuple.
# the assignment below is for structural consistency if it were implemented.
is_injection, result_text = _sanitize_injection(text, matches)
return InjectionDetectionResult(
is_injection=is_injection,
text=result_text,
detections=detected_rules_list,
)
else:
raise NotImplementedError(
f"Expected `action` parameter to be 'reject', 'omit', or 'sanitize' but got {action_option} instead."
)
# no matches found
else:
# We should never ever hit this since we inspect the action option above, but putting an error here anyway.
raise NotImplementedError(
f"Expected `action` parameter to be 'omit' or 'sanitize' but got {action_option} instead."
return InjectionDetectionResult(
is_injection=False, text=text, detections=[]
)
else:
return text
18 changes: 15 additions & 3 deletions nemoguardrails/library/injection_detection/flows.co
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
# OUTPUT RAILS

flow injection detection
"""
Reject, omit, or sanitize injection attempts from the bot.
This rail operates on the $bot_message.
"""
$bot_message = await InjectionDetectionAction(text=$bot_message)
response = await InjectionDetectionAction(text=$bot_message)
join_separator = ", "
injection_detection_action = $config.rails.config.injection_detection.action

if response["is_injection"]
if $config.enable_rails_exceptions
send InjectionDetectionRailException(message="Output not allowed. The output was blocked by the 'injection detection' flow.")
else if injection_detection_action == "reject"
bot "I'm sorry, the desired output triggered rule(s) designed to mitigate exploitation of {{ response.detections | join(join_separator) }}."
abort
else if injection_detection_action == "omit" or injection_detection_action == "sanitize"
$bot_message = response["text"]
else
$bot_message = response["text"]
18 changes: 16 additions & 2 deletions nemoguardrails/library/injection_detection/flows.v1.co
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
define subflow injection detection

define flow injection detection
"""
Reject, omit, or sanitize injection attempts from the bot.
"""
$bot_message = execute injection_detection(text=$bot_message)
$response = execute injection_detection(text=$bot_message)
$join_separator = ", "
$injection_detection_action = $config.rails.config.injection_detection.action
if $response["is_injection"]
if $config.enable_rails_exceptions
create event InjectionDetectionRailException(message="Output not allowed. The output was blocked by the 'injection detection' flow.")
stop
else if $config.rails.config.injection_detection.action == "reject"
bot say "I'm sorry, the desired output triggered rule(s) designed to mitigate exploitation of {{ response.detections | join(join_separator) }}."
stop
else if $injection_detection_action == "omit" or $injection_detection_action == "sanitize"
$bot_message = $response["text"]
else
$bot_message = $response["text"]
Loading