32
32
import re
33
33
from functools import lru_cache
34
34
from pathlib import Path
35
- from typing import Dict , Optional , Tuple , Union
35
+ from typing import Dict , List , Optional , Tuple , TypedDict , Union
36
36
37
37
yara = None
38
38
try :
49
49
log = logging .getLogger (__name__ )
50
50
51
51
52
+ class InjectionDetectionResult (TypedDict ):
53
+ is_injection : bool
54
+ text : str
55
+ detections : List [str ]
56
+
57
+
52
58
def _check_yara_available ():
53
59
if yara is None :
54
60
raise ImportError (
@@ -197,13 +203,13 @@ def _load_rules(
197
203
}
198
204
rules = yara .compile (filepaths = rules_to_load )
199
205
except yara .SyntaxError as e :
200
- msg = f"Encountered SyntaxError : { e } "
206
+ msg = f"Failed to initialize injection detection due to configuration or YARA rule error: YARA compilation failed : { e } "
201
207
log .error (msg )
202
- raise e
208
+ return None
203
209
return rules
204
210
205
211
206
- def _omit_injection (text : str , matches : list ["yara.Match" ]) -> str :
212
+ def _omit_injection (text : str , matches : list ["yara.Match" ]) -> Tuple [ bool , str ] :
207
213
"""
208
214
Attempts to strip the offending injection attempts from the provided text.
209
215
@@ -216,14 +222,18 @@ def _omit_injection(text: str, matches: list["yara.Match"]) -> str:
216
222
matches (list['yara.Match']): A list of YARA rule matches.
217
223
218
224
Returns:
219
- str: The text with the detected injections stripped out.
225
+ Tuple[bool, str]: A tuple containing:
226
+ - bool: True if injection was detected and modified,
227
+ False if the text is safe (i.e., not modified).
228
+ - str: The text, with detected injections stripped out if modified.
220
229
221
230
Raises:
222
231
ImportError: If the yara module is not installed.
223
232
"""
224
233
225
- # Copy the text to a placeholder variable
234
+ original_text = text
226
235
modified_text = text
236
+ is_injection = False
227
237
for match in matches :
228
238
if match .strings :
229
239
for match_string in match .strings :
@@ -234,10 +244,16 @@ def _omit_injection(text: str, matches: list["yara.Match"]) -> str:
234
244
modified_text = modified_text .replace (plaintext , "" )
235
245
except (AttributeError , UnicodeDecodeError ) as e :
236
246
log .warning (f"Error processing match: { e } " )
237
- return modified_text
247
+
248
+ if modified_text != original_text :
249
+ is_injection = True
250
+ return is_injection , modified_text
251
+ else :
252
+ is_injection = False
253
+ return is_injection , original_text
238
254
239
255
240
- def _sanitize_injection (text : str , matches : list ["yara.Match" ]) -> str :
256
+ def _sanitize_injection (text : str , matches : list ["yara.Match" ]) -> Tuple [ bool , str ] :
241
257
"""
242
258
Attempts to sanitize the offending injection attempts in the provided text.
243
259
This is done by 'de-fanging' the offending content, transforming it into a state that will not execute
@@ -253,19 +269,27 @@ def _sanitize_injection(text: str, matches: list["yara.Match"]) -> str:
253
269
matches (list['yara.Match']): A list of YARA rule matches.
254
270
255
271
Returns:
256
- str: The text with the detected injections sanitized.
272
+ Tuple[bool, str]: A tuple containing:
273
+ - bool: True if injection was detected, False otherwise.
274
+ - str: The sanitized text, or original text depending on sanitization outcome.
275
+ Currently, this function will always raise NotImplementedError.
257
276
258
277
Raises:
259
278
NotImplementedError: If the sanitization logic is not implemented.
260
279
ImportError: If the yara module is not installed.
261
280
"""
262
-
263
281
raise NotImplementedError (
264
282
"Injection sanitization is not yet implemented. Please use 'reject' or 'omit'"
265
283
)
284
+ # Hypothetical logic if implemented, to match existing behavior in injection_detection:
285
+ # sanitized_text_attempt = "..." # result of sanitization
286
+ # if sanitized_text_attempt != text:
287
+ # return True, text # Original text returned, marked as injection detected
288
+ # else:
289
+ # return False, sanitized_text_attempt
266
290
267
291
268
- def _reject_injection (text : str , rules : "yara.Rules" ) -> Tuple [bool , str ]:
292
+ def _reject_injection (text : str , rules : "yara.Rules" ) -> Tuple [bool , List [ str ] ]:
269
293
"""
270
294
Detects whether the provided text contains potential injection attempts.
271
295
@@ -277,8 +301,9 @@ def _reject_injection(text: str, rules: "yara.Rules") -> Tuple[bool, str]:
277
301
rules ('yara.Rules'): The loaded YARA rules.
278
302
279
303
Returns:
280
- bool: True if attempted exploitation is detected, False otherwise.
281
- str: list of matches as a string
304
+ Tuple[bool, List[str]]: A tuple containing:
305
+ - bool: True if attempted exploitation is detected, False otherwise.
306
+ - List[str]: List of matched rule names.
282
307
283
308
Raises:
284
309
ValueError: If the `action` parameter in the configuration is invalid.
@@ -289,18 +314,20 @@ def _reject_injection(text: str, rules: "yara.Rules") -> Tuple[bool, str]:
289
314
log .warning (
290
315
"reject_injection guardrail was invoked but no rules were specified in the InjectionDetection config."
291
316
)
292
- return False , ""
317
+ return False , []
293
318
matches = rules .match (data = text )
294
319
if matches :
295
- matches_string = ", " . join ( [match_name .rule for match_name in matches ])
296
- log .info (f"Input matched on rule { matches_string } ." )
297
- return True , matches_string
320
+ matched_rules = [match_name .rule for match_name in matches ]
321
+ log .info (f"Input matched on rule { ', ' . join ( matched_rules ) } ." )
322
+ return True , matched_rules
298
323
else :
299
- return False , ""
324
+ return False , []
300
325
301
326
302
327
@action ()
303
- async def injection_detection (text : str , config : RailsConfig ) -> str :
328
+ async def injection_detection (
329
+ text : str , config : RailsConfig
330
+ ) -> InjectionDetectionResult :
304
331
"""
305
332
Detects and mitigates potential injection attempts in the provided text.
306
333
@@ -310,45 +337,68 @@ async def injection_detection(text: str, config: RailsConfig) -> str:
310
337
311
338
Args:
312
339
text (str): The text to check for command injection.
340
+
313
341
config (RailsConfig): The Rails configuration object containing injection detection settings.
314
342
315
343
Returns:
316
- str: The sanitized or original text, depending on the action specified in the configuration.
344
+ InjectionDetectionResult: A TypedDict containing:
345
+ - is_injection (bool): Whether an injection was detected. True if any injection is detected,
346
+ False if no injection is detected.
347
+ - text (str): The sanitized or original text
348
+ - detections (List[str]): List of matched rule names if any injection is detected
317
349
318
350
Raises:
319
351
ValueError: If the `action` parameter in the configuration is invalid.
320
352
NotImplementedError: If an unsupported action is encountered.
353
+ ImportError: If the yara module is not installed.
321
354
"""
322
355
_check_yara_available ()
323
356
324
357
_validate_injection_config (config )
358
+
325
359
action_option , yara_path , rule_names , yara_rules = _extract_injection_config (config )
326
360
327
361
rules = _load_rules (yara_path , rule_names , yara_rules )
328
362
329
- if action_option == "reject" :
330
- verdict , detections = _reject_injection (text , rules )
331
- if verdict :
332
- return f"I'm sorry, the desired output triggered rule(s) designed to mitigate exploitation of { detections } ."
333
- else :
334
- return text
335
363
if rules is None :
336
364
log .warning (
337
365
"injection detection guardrail was invoked but no rules were specified in the InjectionDetection config."
338
366
)
339
- return text
340
- matches = rules .match (data = text )
341
- if matches :
342
- matches_string = ", " .join ([match_name .rule for match_name in matches ])
343
- log .info (f"Input matched on rule { matches_string } ." )
344
- if action_option == "omit" :
345
- return _omit_injection (text , matches )
346
- elif action_option == "sanitize" :
347
- return _sanitize_injection (text , matches )
367
+ return InjectionDetectionResult (is_injection = False , text = text , detections = [])
368
+
369
+ if action_option == "reject" :
370
+ is_injection , detected_rules = _reject_injection (text , rules )
371
+ return InjectionDetectionResult (
372
+ is_injection = is_injection , text = text , detections = detected_rules
373
+ )
374
+ else :
375
+ matches = rules .match (data = text )
376
+ if matches :
377
+ detected_rules_list = [match_name .rule for match_name in matches ]
378
+ log .info (f"Input matched on rule { ', ' .join (detected_rules_list )} ." )
379
+
380
+ if action_option == "omit" :
381
+ is_injection , result_text = _omit_injection (text , matches )
382
+ return InjectionDetectionResult (
383
+ is_injection = is_injection ,
384
+ text = result_text ,
385
+ detections = detected_rules_list ,
386
+ )
387
+ elif action_option == "sanitize" :
388
+ # _sanitize_injection will raise NotImplementedError before returning a tuple.
389
+ # the assignment below is for structural consistency if it were implemented.
390
+ is_injection , result_text = _sanitize_injection (text , matches )
391
+ return InjectionDetectionResult (
392
+ is_injection = is_injection ,
393
+ text = result_text ,
394
+ detections = detected_rules_list ,
395
+ )
396
+ else :
397
+ raise NotImplementedError (
398
+ f"Expected `action` parameter to be 'reject', 'omit', or 'sanitize' but got { action_option } instead."
399
+ )
400
+ # no matches found
348
401
else :
349
- # We should never ever hit this since we inspect the action option above, but putting an error here anyway.
350
- raise NotImplementedError (
351
- f"Expected `action` parameter to be 'omit' or 'sanitize' but got { action_option } instead."
402
+ return InjectionDetectionResult (
403
+ is_injection = False , text = text , detections = []
352
404
)
353
- else :
354
- return text
0 commit comments