"""安全扫描 Hook - Phase 7.2 扫描工具调用和结果中的敏感信息。 """ import re from typing import Any from app.agents.tools.hooks.types import ( ExecutionContext, HookResult, ) # 敏感信息模式 SENSITIVE_PATTERNS = { "api_key": [ r"api[_-]?key['\"]?\s*[:=]\s*['\"]?[a-zA-Z0-9_\-]{20,}", r"apikey['\"]?\s*[:=]\s*['\"]?[a-zA-Z0-9_\-]{20,}", ], "password": [ r"password['\"]?\s*[:=]\s*['\"]?[^\s'\"]{8,}", r"passwd['\"]?\s*[:=]\s*['\"]?[^\s'\"]{8,}", r"secret['\"]?\s*[:=]\s*['\"]?[a-zA-Z0-9_\-]{20,}", ], "token": [ r"token['\"]?\s*[:=]\s*['\"]?[a-zA-Z0-9_\-\.]{20,}", r"bearer\s+[a-zA-Z0-9_\-\.]+", r"ghp_[a-zA-Z0-9]{36}", r"sk-[a-zA-Z0-9]{48}", ], "private_key": [ r"-----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----", r"-----END (RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----", ], "ip_address": [ r"\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b", ], "email": [ r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", ], } class SecurityScanHook: """安全扫描 Hook 扫描工具输入和输出中的敏感信息,进行脱敏处理。 """ def __init__( self, redact: bool = True, block_on_detect: bool = False, ): """ Args: redact: 是否对敏感信息进行脱敏 block_on_detect: 检测到敏感信息时是否阻止执行 """ self.redact = redact self.block_on_detect = block_on_detect self._compiled_patterns = { name: [re.compile(p, re.IGNORECASE) for p in patterns] for name, patterns in SENSITIVE_PATTERNS.items() } async def pre_tool_use(self, context: ExecutionContext) -> HookResult: """扫描输入参数""" detected = self._scan_dict(context.tool_input) if detected: context.metadata["security_detected"] = detected if self.block_on_detect: return HookResult( hook_name="security_scan", success=False, continue_execution=False, error=f"检测到敏感信息: {', '.join(detected.keys())}", metadata={"detected": detected, "blocked": True}, ) if self.redact: redacted_input = self._redact_dict(context.tool_input.copy()) return HookResult( hook_name="security_scan", success=True, continue_execution=True, modified_input=redacted_input, metadata={"detected": detected, "redacted": True}, ) return HookResult( hook_name="security_scan", success=True, continue_execution=True, ) async def post_tool_use(self, context: ExecutionContext, result: Any) -> HookResult: """扫描输出结果""" if isinstance(result, dict): detected = self._scan_dict(result) if detected: context.metadata["security_detected_output"] = detected if self.redact: redacted_result = self._redact_dict(result.copy()) return HookResult( hook_name="security_scan", success=True, continue_execution=True, modified_output=redacted_result, metadata={"detected": detected, "redacted": True}, ) elif isinstance(result, str): detected = self._scan_string(result) if detected: context.metadata["security_detected_output"] = detected if self.redact: redacted_result = self._redact_string(result) return HookResult( hook_name="security_scan", success=True, continue_execution=True, modified_output=redacted_result, metadata={"detected": detected, "redacted": True}, ) return HookResult( hook_name="security_scan", success=True, continue_execution=True, modified_output=result, ) def _scan_dict(self, data: dict[str, Any]) -> dict[str, list[str]]: """扫描字典中的敏感信息""" result: dict[str, list[str]] = {} for key, value in data.items(): if isinstance(value, str): found = self._scan_string(value) if found: result[key] = found return result def _scan_string(self, text: str) -> list[str]: """扫描字符串中的敏感信息""" found_types = [] for name, patterns in self._compiled_patterns.items(): for pattern in patterns: if pattern.search(text): if name not in found_types: found_types.append(name) break return found_types def _redact_dict(self, data: dict[str, Any]) -> dict[str, Any]: """脱敏字典中的敏感信息""" for key, value in data.items(): if isinstance(value, str): data[key] = self._redact_string(value) elif isinstance(value, dict): data[key] = self._redact_dict(value) elif isinstance(value, list): data[key] = [self._redact_string(v) if isinstance(v, str) else v for v in value] return data def _redact_string(self, text: str) -> str: """脱敏字符串中的敏感信息""" for name, patterns in self._compiled_patterns.items(): for pattern in patterns: text = pattern.sub(f"[REDACTED:{name}]", text) return text