AI应用的安全防护:从输入到输出的全链路安全
AI应用的安全防护:从输入到输出的全链路安全
前言
有一次,我们的产品上线了一个新功能:用户可以输入任何问题,AI 会回答。结果第二天就被用户"玩坏了"——有人输入了诱导性的政治问题,有人在尝试套取其他用户的信息,还有人输入了恶意代码。
从那以后,我深刻认识到:AI 应用的安全问题比传统应用更复杂,需要从输入到输出的全链路防护。
今天,分享我们在 AI 安全方面的实践和思考。
一、AI 安全的特点
1.1 AI 安全 vs 传统安全
| 维度 | 传统安全 | AI 安全 |
|---|---|---|
| 攻击面 | 代码漏洞 | 模型行为 |
| 攻击方式 | SQL 注入、XSS | Prompt 注入、幻觉 |
| 防御重点 | 输入验证 | 输入过滤 + 输出审核 |
| 复杂性 | 确定性强 | 概率性、不确定性 |
1.2 AI 安全风险分类
class AISecurityRisks: RISKS = { "prompt_injection": { "description": "通过 Prompt 注入恶意指令", "severity": "high", "examples": ["角色扮演逃逸", "指令覆盖"] }, "data_leakage": { "description": "模型泄露敏感信息", "severity": "high", "examples": ["训练数据记忆", "上下文泄露"] }, "harmful_content": { "description": "生成有害内容", "severity": "critical", "examples": ["暴力", "色情", "虚假信息"] }, "adversarial_attack": { "description": "对抗性攻击", "severity": "medium", "examples": ["对抗样本", "数据投毒"] }, "model_manipulation": { "description": "模型行为操纵", "severity": "medium", "examples": ["奖励黑客", "捷径学习"] } }二、输入安全防护
2.1 输入验证
from pydantic import BaseModel, validator import re class ChatInput(BaseModel): message: str @validator('message') def validate_message(cls, v): # 长度检查 if len(v) > 5000: raise ValueError('消息长度不能超过 5000 字符') if len(v) < 1: raise ValueError('消息不能为空') # 检查是否包含可疑模式 suspicious_patterns = [ r'(ignore|disregard|forget).*(previous|above|instruction)', r'system.*prompt', r'\[INST\].*\[\/INST\]', ] for pattern in suspicious_patterns: if re.search(pattern, v, re.IGNORECASE): raise ValueError('输入包含可疑内容') return v.strip() class ContentModeration: def __init__(self): self.sensitive_topics = ["政治", "暴力", "色情", "仇恨"] self.blocked_patterns = [ r'生成长文章关于.*', r'解释如何.*', ] def check(self, text: str) -> dict: """内容审核""" issues = [] # 检查敏感词 for topic in self.sensitive_topics: if topic in text: issues.append(f"包含敏感话题: {topic}") # 检查模式 for pattern in self.blocked_patterns: if re.match(pattern, text): issues.append(f"触发禁止模式") return { "allowed": len(issues) == 0, "issues": issues }2.2 Prompt 注入检测
class PromptInjectionDetector: def __init__(self): self.injection_patterns = [ r'忽略之前的指令', r'你是一个.*而不是.*', r'忘记.*规则', r'\(系统提示.*\)', r'<\/?system>', r'\[INST\].*\[\/INST\]', ] def detect(self, text: str) -> dict: """检测 Prompt 注入""" matches = [] for pattern in self.injection_patterns: found = re.findall(pattern, text, re.IGNORECASE) if found: matches.extend(found) # 检查是否包含异常长的指令 instruction_count = text.count('请') + text.count('必须') + text.count('应该') return { "is_injection": len(matches) > 0 or instruction_count > 10, "matches": matches, "instruction_count": instruction_count, "risk_level": "high" if len(matches) > 0 else "low" }三、输出安全防护
3.1 输出过滤
class OutputFilter: def __init__(self): self.blocked_content = { "personal_info": [ r'\d{11}', # 手机号 r'\d{15,18}', # 身份证号 r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', # 邮箱 ], "harmful_content": [ "暴力内容关键词", "色情内容关键词", ] } def filter(self, text: str) -> str: """过滤敏感内容""" filtered = text # 过滤个人信息 for pattern in self.blocked_content["personal_info"]: filtered = re.sub(pattern, '[已过滤]', filtered) return filtered def check_safe(self, text: str) -> dict: """检查内容安全性""" issues = [] # 检查是否包含敏感内容 for keyword in self.blocked_content["harmful_content"]: if keyword in text: issues.append(f"包含敏感内容: {keyword}") return { "is_safe": len(issues) == 0, "issues": issues }3.2 幻觉检测
class HallucinationDetector: def __init__(self, fact_checker): self.fact_checker = fact_checker def detect(self, text: str, context: str = "") -> dict: """检测幻觉内容""" # 提取陈述 statements = self._extract_statements(text) results = [] for statement in statements: # 检查是否为事实陈述 if self._is_factual_statement(statement): # 与知识库比对 fact_check_result = self.fact_checker.check(statement) results.append({ "statement": statement, "verified": fact_check_result["verified"], "confidence": fact_check_result.get("confidence", 0) }) hallucinations = [r for r in results if not r["verified"]] return { "has_hallucination": len(hallucinations) > 0, "hallucinations": hallucinations, "confidence": 1 - (len(hallucinations) / max(len(results), 1)) } def _extract_statements(self, text: str) -> list: """提取陈述句""" # 简化实现:按句号分割 return [s.strip() for s in text.split('。') if s.strip()] def _is_factual_statement(self, statement: str) -> bool: """判断是否为事实陈述""" # 包含数字、人名、地名等可能是事实陈述 factual_indicators = [r'\d+', r'[A-Z][a-z]+', r'公司|机构|组织'] return any(re.search(ind, statement) for ind in factual_indicators)四、API 安全
4.1 限流防护
from fastapi import HTTPException, Request from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) class RateLimiter: def __init__(self): self.limits = { "free": {"requests": 10, "window": 60}, "basic": {"requests": 100, "window": 60}, "pro": {"requests": 1000, "window": 60} } async def check_limit(self, user_id: str, plan: str): """检查限流""" limit = self.limits.get(plan, self.limits["free"]) key = f"rate_limit:{user_id}" current = await redis_client.get(key) if current and int(current) >= limit["requests"]: raise HTTPException( status_code=429, detail="请求过于频繁,请稍后再试" ) # 增加计数 await redis_client.incr(key) if not current: await redis_client.expire(key, limit["window"])4.2 身份验证
from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import jwt security = HTTPBearer() async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """验证 JWT token""" try: token = credentials.credentials payload = jwt.decode( token, SECRET_KEY, algorithms=["HS256"] ) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=401, detail="无效的 token") return {"user_id": user_id, "plan": payload.get("plan", "free")} except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token 已过期") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="无效的 token")五、安全监控
5.1 安全日志
class SecurityLogger: def __init__(self): self.logger = StructuredLogger("security") def log_injection_attempt(self, user_id: str, content: str, pattern: str): """记录注入尝试""" self.logger.warning( "Prompt 注入尝试", user_id=user_id, content_hash=hashlib.md5(content.encode()).hexdigest(), detected_pattern=pattern, event_type="security_injection_attempt" ) def log_rate_limit(self, user_id: str, plan: str): """记录限流触发""" self.logger.warning( "触发限流", user_id=user_id, plan=plan, event_type="security_rate_limit" ) def log_hallucination(self, request_id: str, content: str): """记录幻觉检测""" self.logger.info( "检测到幻觉内容", request_id=request_id, content_hash=hashlib.md5(content.encode()).hexdigest(), event_type="security_hallucination" )5.2 告警规则
# security-alerts.yml groups: - name: security_alerts rules: - alert: MultipleInjectionAttempts expr: increase(security_injection_attempts[5m]) > 3 for: 1m labels: severity: high annotations: summary: "检测到多次注入尝试" - alert: RateLimitExceeded expr: increase(security_rate_limit[1m]) > 10 for: 1m labels: severity: medium annotations: summary: "限流触发频繁"六、合规与隐私
6.1 数据脱敏
class DataAnonymizer: def anonymize(self, text: str) -> str: """数据脱敏""" patterns = [ (r'\d{11}', '[手机号]'), # 手机号 (r'\d{15,18}', '[身份证]'), # 身份证 (r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[邮箱]'), # 邮箱 ] for pattern, replacement in patterns: text = re.sub(pattern, replacement, text) return text def anonymize_pii(self, data: dict) -> dict: """脱敏 PII 数据""" anonymized = data.copy() pii_fields = ["name", "phone", "email", "id_card"] for field in pii_fields: if field in anonymized: anonymized[field] = "[已脱敏]" return anonymized6.2 审计日志
class AuditLogger: def __init__(self): self.db = AuditDatabase() async def log(self, event_type: str, user_id: str, details: dict): """记录审计日志""" log_entry = { "timestamp": datetime.now().isoformat(), "event_type": event_type, "user_id": user_id, "details": details, "ip_address": get_client_ip(), "user_agent": get_user_agent() } await self.db.insert(log_entry)七、最佳实践
7.1 安全开发原则
- ✅最小权限:只请求必要的权限
- ✅纵深防御:多层安全防护
- ✅默认安全:安全配置默认开启
- ✅持续监控:实时监控安全事件
7.2 应急响应
class SecurityIncidentResponse: def __init__(self): self.playbooks = { "prompt_injection": self._handle_injection, "data_leakage": self._handle_leakage, "ddos": self._handle_ddos } async def respond(self, incident_type: str, details: dict): """应急响应""" handler = self.playbooks.get(incident_type) if handler: await handler(details) async def _handle_injection(self, details: dict): """处理注入事件""" # 1. 隔离相关请求 # 2. 通知安全团队 # 3. 分析攻击模式 # 4. 更新防护规则 pass八、总结
AI 安全需要从输入到输出的全链路防护。关键在于:
- 输入过滤:阻止恶意输入
- 输出审核:确保输出安全
- 持续监控:及时发现和处理问题
- 合规隐私:遵守法规要求
记住:AI 安全是一个持续的过程,需要不断学习和改进。让我们一起打造安全的 AI 应用!
