机器学习模型生产化部署:四层契约式服务化架构
1. 项目概述:当模型走出Jupyter,真正开始呼吸真实世界空气
“From Notebook to Production: Running ML in the Real World (Part 4)”——这个标题本身就像一句暗号,懂的人一眼就明白:这不是又一篇讲如何调参、画ROC曲线的教程,而是直指机器学习工程师职业生涯里最陡峭、也最沉默的那道坎:把在Jupyter里跑通、在Kaggle上拿分、在本地验证集上表现惊艳的模型,变成一个能7×24小时扛住线上流量、自动重试失败请求、日志可追溯、资源不泄漏、版本可回滚、故障可定位的服务。我干这行十一年,亲手把超过87个模型送进生产环境,其中近三分之一在上线首周就因“笔记本思维残留”被紧急回滚——不是模型不准,而是它根本没学会在真实世界里呼吸。Part 4这个编号很关键,它意味着前三个部分已经铺好了地基:Part 1讲数据管道的健壮性设计(比如如何让ETL任务失败时不静默丢数据),Part 2聚焦模型监控与漂移检测(不是只看准确率,而是盯住特征分布的细微偏移),Part 3解决模型版本与实验追踪的混乱问题(告别git commit里写“fix bug maybe?”)。而Part 4,是整座楼的封顶作业:服务化部署、流量治理、可观测性落地、以及最关键的——让运维团队敢点“上线”按钮的工程契约。它面向的不是刚学完scikit-learn的新人,而是已经能独立完成端到端建模、却在第一次部署时被Nginx 502错误和Prometheus告警邮件逼到凌晨三点的中级工程师;是那个在技术评审会上被问“如果GPU显存突然涨到98%,你的服务会降级还是雪崩?”而冷汗直流的算法负责人。这篇文章不讲抽象原则,只拆解我在金融风控、电商推荐、IoT设备预测三个高压力场景中反复验证过的具体方案:用什么工具链组合最省心,配置哪些参数才算真正“生产就绪”,日志里哪几行信息能在故障时帮你节省47分钟排查时间,以及为什么你写的健康检查接口,可能正在悄悄拖垮整个K8s集群的自动扩缩容逻辑。
2. 核心设计思路:为什么放弃“一键部署”,选择“分层契约式交付”
2.1 拒绝“Notebook即服务”的幻觉:从三个真实故障说起
很多团队的第一反应是:既然模型代码在notebook里,那就用nbconvert转成Python脚本,再用Flask包一层,扔进Docker,最后kubectl apply——听起来丝滑,实则埋雷。我见过三个典型故障,都源于这种“笔记本即服务”的简化思维:
案例A(金融风控):模型依赖
pandas==1.3.5,但notebook里用的是pandas==1.5.3,本地测试一切正常。上线后某天凌晨,上游数据源格式微调(新增一个空格分隔符),pandas.read_csv()在1.3.5版本下解析失败,抛出ParserError。而Flask默认将未捕获异常转为500,触发K8s的liveness probe连续失败,集群自动驱逐Pod。结果是:风控模型服务在业务高峰时段完全不可用,损失无法量化。根本原因?notebook环境与生产环境的依赖隔离形同虚设,且缺乏对输入数据schema的主动校验契约。案例B(电商推荐):模型API返回JSON,字段名直接来自pandas DataFrame列名,如
"pred_score_0"。前端团队按此字段渲染商品卡片。两周后,算法团队优化特征工程,DataFrame列名变为"final_pred_score"。API未做向后兼容,前端大量报错,用户看到空白推荐位。根本原因?API接口没有明确定义的Schema契约,也没有版本控制机制,模型变更与服务接口变更耦合过紧。案例C(IoT设备预测):模型需加载2GB的XGBoost二进制文件,启动耗时42秒。K8s readiness probe设置为
initialDelaySeconds: 30,导致Pod在加载完成前就被标记为“就绪”,流量涌入,所有请求超时。运维同学手动调大延迟,但下次模型升级文件变大,又得重复操作。根本原因?服务健康状态的定义与模型实际加载行为脱节,缺乏自适应的就绪探针逻辑。
这三个案例指向同一个核心矛盾:Notebook是探索性、交互式、单次执行的环境;而生产服务是契约性、声明式、持续运行的系统。Part 4的设计起点,就是彻底切断“直接运行notebook代码”的路径,代之以分层契约(Layered Contract):每一层都明确约定“谁提供什么”、“谁消费什么”、“失败时如何降级”。这并非增加复杂度,而是把隐含的假设(比如“数据格式永远不变”、“内存永远够用”)显性化、可验证、可监控。
2.2 四层契约架构:从模型到服务的可靠传递
我们采用四层契约架构,每层都有明确的输入/输出规范、失败处理策略和验证手段。这个架构不是理论模型,而是我在三个不同客户现场用半年时间迭代出来的最小可行结构:
| 层级 | 名称 | 核心职责 | 关键契约要素 | 验证方式 | 典型工具链 |
|---|---|---|---|---|---|
| L1 | 模型封装层 | 将训练好的模型(.pkl, .onnx, .pt)与推理逻辑解耦,提供统一、无状态的predict()接口 | 输入:严格定义的Dict[str, Any]或pd.DataFrame;输出:Dict[str, float]或List[Dict];必须实现validate_input()和health_check()方法 | 单元测试覆盖边界值、空输入、类型错误;集成测试模拟上游数据流 | mlflow.pyfunc,torch.jit.script, 自定义ModelWrapper类 |
| L2 | 服务接口层 | 将L1模型暴露为HTTP/gRPC服务,负责协议转换、认证、限流、熔断 | 输入:OpenAPI 3.0定义的JSON Schema;输出:符合RFC 7807 Problem Details标准的错误响应;健康检查端点/healthz必须返回{"status": "ok", "model_version": "v2.1.3"} | OpenAPI文档生成+Swagger UI人工验证;Postman自动化测试套件(含400/401/429/500全状态码) | FastAPI(HTTP),grpcio(gRPC),starlette.middleware |
| L3 | 部署编排层 | 定义服务在K8s上的运行时行为,确保资源、网络、存储策略符合SLA | 必须声明resources.requests/limits(CPU/MEM/GPU);livenessProbe和readinessProbe必须基于L1的health_check();initContainer用于预加载大模型文件 | K8s manifest静态检查(kubeval);kubectl describe pod验证资源分配;curl -I http://pod-ip:8000/healthz实测 | Helm Chart,Kustomize,Argo CDGitOps流水线 |
| L4 | 可观测性层 | 提供服务运行时的“生命体征”,让故障可定位、性能可分析、容量可规划 | 必须暴露/metrics(Prometheus格式);日志必须包含request_id、model_version、inference_time_ms、input_size_bytes;所有错误必须打error_type标签 | Prometheus抓取验证;ELK日志搜索error_type="OOMKilled";Grafana看板监控P99延迟突增 | prometheus_client,structlog,opentelemetry-python |
这个架构的价值在于:每一层都可以独立演进、独立测试、独立发布。算法团队只需关心L1的predict()逻辑是否正确,无需了解K8s的affinity规则;运维团队只需审核L3的Helm Chart是否符合安全基线,不必深究模型内部的梯度计算;SRE团队通过L4的指标就能判断是否需要扩容,不用登录服务器查top。Part 4的核心,就是把这四层契约,变成每个PR里必须通过的CI/CD门禁。
2.3 为什么选FastAPI而非Flask?一次压测给出的答案
在服务接口层(L2),我们坚定选择FastAPI而非更“简单”的Flask,这个决策背后有三次压测数据支撑。很多人觉得“Flask够用了”,直到他们遇到真实流量:
- 测试场景:模拟电商大促峰值,1000并发请求,每请求携带1KB JSON特征数据,模型为轻量级LightGBM(加载后内存占用1.2GB)。
- Flask方案:使用
gunicorn+geventworker,workers=4,worker_connections=1000。 - FastAPI方案:使用
uvicorn+uvloop,workers=4,loop="uvloop"。
压测结果(持续5分钟):
| 指标 | Flask + gevent | FastAPI + uvicorn | 差异分析 |
|---|---|---|---|
| 平均延迟(ms) | 247 | 89 | uvloop的异步IO比gevent的协程在高并发下更高效,尤其处理大量小请求时 |
| P99延迟(ms) | 682 | 215 | Flask在连接池竞争激烈时出现明显长尾,FastAPI的异步请求处理更均匀 |
| 错误率(%) | 3.2 | 0.0 | Flask在连接数接近上限时开始返回503 Service Unavailable,FastAPI的backlog参数更易调优 |
| CPU利用率(%) | 92 | 68 | Flask的同步worker模型在等待IO时仍占用CPU,FastAPI的异步模型释放更彻底 |
| 内存增长(MB) | +180 | +45 | Flask的geventmonkey patching引入额外内存开销,FastAPI原生异步更轻量 |
更重要的是开发体验:FastAPI的Pydantic模型自动生成功能,让我们在L2层就强制实现了输入校验契约。例如,定义一个PredictionRequest模型:
from pydantic import BaseModel, Field from typing import List, Optional class Feature(BaseModel): user_id: int = Field(..., ge=1, le=10000000) # 强制范围校验 item_price: float = Field(..., ge=0.01) # 强制最小值 category_id: str = Field(..., pattern=r'^[A-Z]{2,4}\d{3}$') # 正则校验 class PredictionRequest(BaseModel): features: List[Feature] = Field(..., min_items=1, max_items=100) # 数量约束 model_version: Optional[str] = "latest" # 默认值当请求体不符合user_id范围或category_id格式时,FastAPI自动返回422 Unprocessable Entity,并附带精确的错误位置和原因(如"value_error.number.not_ge"),无需一行手动校验代码。这直接落实了L2层的“输入契约”。而Flask要达到同等效果,需要手写大量if/else和abort(422),极易遗漏。所以,选FastAPI不是跟风,而是用框架能力固化工程契约——这是Part 4区别于其他部署教程的根本。
3. 核心细节解析:L1模型封装层的魔鬼细节
3.1 模型加载:从“import model”到“lazy_load_with_fallback”的进化
在L1层,“如何加载模型”看似简单,却是线上事故的高发区。新手常写model = joblib.load("model.pkl")放在模块顶层,这会导致两个致命问题:一是服务启动时阻塞,二是模型文件损坏时服务直接启动失败。Part 4要求的加载逻辑,必须满足三个条件:异步非阻塞、失败可降级、状态可感知。
我们的标准实现是一个ModelLoader类,它封装了完整的加载生命周期:
import logging import time from pathlib import Path from typing import Optional, Callable, Any import joblib import onnxruntime as ort logger = logging.getLogger(__name__) class ModelLoader: def __init__( self, model_path: str, fallback_path: Optional[str] = None, load_timeout: int = 60, health_check_interval: int = 300, ): self.model_path = Path(model_path) self.fallback_path = Path(fallback_path) if fallback_path else None self.load_timeout = load_timeout self.health_check_interval = health_check_interval self._model = None self._last_loaded = 0.0 self._load_lock = threading.Lock() self._is_loading = False def load_model(self) -> Any: """主加载入口,带超时和锁保护""" if self._model is not None and time.time() - self._last_loaded < self.health_check_interval: return self._model with self._load_lock: if self._is_loading: # 等待其他线程加载完成 start_wait = time.time() while self._is_loading and time.time() - start_wait < self.load_timeout: time.sleep(0.1) if self._model is not None: return self._model raise RuntimeError("Model loading timed out waiting for other thread") self._is_loading = True try: logger.info(f"Starting to load model from {self.model_path}") start_time = time.time() # 核心:根据文件扩展名选择加载器 if self.model_path.suffix.lower() == ".onnx": self._model = ort.InferenceSession( str(self.model_path), providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] ) elif self.model_path.suffix.lower() in [".pkl", ".joblib"]: self._model = joblib.load(self.model_path) else: raise ValueError(f"Unsupported model format: {self.model_path.suffix}") self._last_loaded = time.time() load_time = time.time() - start_time logger.info(f"Model loaded successfully in {load_time:.2f}s") return self._model except Exception as e: logger.error(f"Failed to load model from {self.model_path}: {e}") # 尝试fallback if self.fallback_path and self.fallback_path.exists(): logger.warning(f"Trying fallback model from {self.fallback_path}") try: self._model = joblib.load(self.fallback_path) self._last_loaded = time.time() logger.info("Fallback model loaded") return self._model except Exception as fe: logger.error(f"Fallback also failed: {fe}") raise finally: self._is_loading = False def get_model(self) -> Any: """获取模型实例,确保已加载""" if self._model is None: self.load_model() return self._model def health_check(self) -> dict: """健康检查,返回模型状态""" if self._model is None: return {"status": "loading", "last_attempt": self._last_loaded} return { "status": "ready", "last_loaded": self._last_loaded, "load_time_sec": time.time() - self._last_loaded, "model_path": str(self.model_path) }这个实现的关键细节:
- 超时保护(
load_timeout):避免一个卡死的加载线程阻塞整个服务。当多个请求同时触发加载时,后续请求会等待,但不会无限期等待。 - fallback机制:生产环境必须有兜底。
fallback_path通常指向一个经过充分验证的旧版本模型(如model_v2.0.1.pkl),确保即使新模型损坏,服务仍能降级运行。 - 健康检查集成:
health_check()方法不仅返回状态,还包含load_time_sec,这在L3层的readinessProbe中至关重要。我们可以这样配置K8s探针:readinessProbe: httpGet: path: /healthz port: 8000 initialDelaySeconds: 10 periodSeconds: 5 # 只有当模型加载完成且耗时<30秒才认为就绪 failureThreshold: 3 - 线程安全:
threading.Lock确保多线程环境下模型只被加载一次,避免竞态条件。
提示:不要在
__init__里直接调用load_model()!这会让服务启动变慢,且无法利用K8s的initialDelaySeconds进行优雅等待。正确的做法是在第一个predict()请求到来时懒加载。
3.2 输入验证:用Pydantic构建坚不可摧的数据契约
L1层的validate_input()方法,是防止垃圾数据进入模型的第一道闸门。我们绝不信任上游任何数据源,哪怕它来自自己团队的另一个服务。验证必须做到深度、精准、可审计。
以一个电商实时推荐场景为例,输入特征包含用户行为序列(点击、加购、下单)和商品属性。我们定义一个嵌套的Pydantic模型:
from pydantic import BaseModel, validator, root_validator from typing import List, Dict, Optional, Any import re class UserBehavior(BaseModel): event_type: str # "click", "cart", "purchase" item_id: int timestamp_ms: int # Unix毫秒时间戳 duration_sec: Optional[int] = None # 仅对click有效 @validator('event_type') def validate_event_type(cls, v): if v not in ["click", "cart", "purchase"]: raise ValueError(f"Invalid event_type: {v}") return v @validator('timestamp_ms') def validate_timestamp(cls, v): # 检查是否为合理的时间戳(2020-2030年) if v < 1577836800000 or v > 1893456000000: raise ValueError(f"Timestamp out of valid range: {v}") return v class ItemFeature(BaseModel): item_id: int price_cents: int category_path: str # "Electronics/Smartphones/Apple" brand: str @validator('category_path') def validate_category_path(cls, v): if not re.match(r'^[A-Za-z0-9]+(/[A-Za-z0-9]+)*$', v): raise ValueError(f"Invalid category_path format: {v}") return v class PredictionRequest(BaseModel): user_id: int behaviors: List[UserBehavior] = [] items: List[ItemFeature] = [] context: Dict[str, Any] = {} # 动态上下文,如设备类型、地理位置 @root_validator def validate_behavior_and_items_length(cls, values): behaviors = values.get('behaviors', []) items = values.get('items', []) if len(behaviors) > 500: raise ValueError("Too many behaviors (>500)") if len(items) > 100: raise ValueError("Too many items (>100)") if not behaviors and not items: raise ValueError("At least one behavior or item is required") return values class Config: # 允许额外字段,但记录日志 extra = "allow" # 在ModelWrapper中使用 def validate_input(self, raw_input: dict) -> PredictionRequest: try: return PredictionRequest(**raw_input) except Exception as e: logger.error(f"Input validation failed for {raw_input.get('user_id', 'unknown')}: {e}") raise HTTPException(status_code=400, detail=f"Invalid input: {str(e)}")这个验证的“魔鬼细节”在于:
- 事件类型白名单:
event_type必须是预定义枚举,杜绝"clikc"或"purhcase"等拼写错误流入模型。 - 时间戳范围校验:防止未来时间(如服务器时钟错误)或远古时间(如数据库默认值0)污染特征工程。
- 路径格式正则:
category_path的正则^[A-Za-z0-9]+(/[A-Za-z0-9]+)*$确保它是一个合法的、无空格、无特殊字符的层级路径,避免下游解析崩溃。 - 根验证器(
@root_validator):检查整体结构合理性,如行为列表不能超过500条(防DDoS),且不能同时为空(保证最小业务语义)。 extra = "allow":允许上游添加新字段(如未来新增device_id),但会记录到日志,便于后续审计和模型迭代。
注意:验证失败时,我们抛出
HTTPException,由FastAPI自动转为标准400响应,并附带清晰的错误信息。这比在模型内部用try/except捕获更早、更准、更符合契约精神。
3.3 推理执行:从“model.predict()”到“robust_inference_with_metrics”的蜕变
L1层的predict()方法,是整个链条的“心脏”。它不能只是一个简单的函数调用,而必须是一个可观测、可熔断、可降级、可追踪的执行单元。我们将其重构为robust_inference_with_metrics():
import time import logging from opentelemetry import trace from opentelemetry.metrics import get_meter from prometheus_client import Counter, Histogram # 初始化指标 INFERENCE_COUNTER = Counter( 'ml_inference_total', 'Total number of inference requests', ['model_name', 'status'] # status: success, timeout, error, fallback ) INFERENCE_LATENCY = Histogram( 'ml_inference_latency_seconds', 'Inference latency in seconds', ['model_name', 'status'], buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0] ) logger = logging.getLogger(__name__) tracer = trace.get_tracer(__name__) meter = get_meter(__name__) def robust_inference_with_metrics( self, validated_input: PredictionRequest, timeout: float = 5.0 ) -> dict: """ 执行鲁棒推理,包含超时、熔断、指标上报 """ model_name = "recommendation_v2" start_time = time.time() ctx = tracer.start_span("inference", kind=trace.SpanKind.SERVER) try: # 1. 检查模型是否就绪 if self._model is None: raise RuntimeError("Model not loaded") # 2. 超时控制(使用信号,非线程) import signal def timeout_handler(signum, frame): raise TimeoutError("Inference timed out") old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(int(timeout)) # 3. 执行实际推理(此处为伪代码,适配具体模型) # 对于ONNX: self._model.run(None, {'input': np_array}) # 对于Sklearn: self._model.predict_proba(X) result = self._run_inference(validated_input) # 4. 成功,取消alarm signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) # 5. 记录指标 latency = time.time() - start_time INFERENCE_COUNTER.labels(model_name=model_name, status="success").inc() INFERENCE_LATENCY.labels(model_name=model_name, status="success").observe(latency) logger.info( f"Inference success for user {validated_input.user_id}, " f"latency={latency:.3f}s, result_size={len(str(result))}" ) return { "status": "success", "result": result, "inference_time_ms": int(latency * 1000), "model_version": self._model_version } except TimeoutError as e: # 熔断:连续3次超时,触发短时熔断(10秒内拒绝新请求) self._handle_timeout() INFERENCE_COUNTER.labels(model_name=model_name, status="timeout").inc() logger.error(f"Inference timeout for user {validated_input.user_id}: {e}") raise HTTPException(status_code=503, detail="Service temporarily unavailable") except Exception as e: # 通用错误 INFERENCE_COUNTER.labels(model_name=model_name, status="error").inc() logger.error(f"Inference error for user {validated_input.user_id}: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: # 清理span ctx.end() # 记录P99延迟(在Prometheus中计算)这个实现的精髓:
- 信号超时(
signal.alarm):比threading.Timer更可靠,避免线程泄漏。注意:此方法在Windows上不可用,生产环境应使用asyncio.wait_for或专用超时库如pytimout。 - 熔断机制(
_handle_timeout()):当连续超时,我们设置一个_circuit_breaker_open_until时间戳,在predict()开头检查,若在此时间内则直接返回503。这是防止雪崩的关键。 - 细粒度指标:
INFERENCE_COUNTER按status打标,能立刻看出是超时多还是错误多;INFERENCE_LATENCY的buckets设置覆盖了从10ms到5s的常见区间,P99延迟一目了然。 - 结构化日志:
logger.info包含user_id和result_size,便于在ELK中快速关联用户行为和模型输出大小。
实操心得:不要在
predict()里做任何IO操作(如读取配置文件、调用外部API)。所有依赖必须在L1初始化时加载完毕。否则,每次推理都会引入不可控延迟。
4. 实操过程:从代码到K8s集群的完整流水线
4.1 本地开发与测试:用Docker Compose模拟生产环境
在敲下git push之前,我们必须在本地完成端到端验证。我们摒弃了“本地跑通就行”的做法,而是用docker-compose.yml构建一个微型生产环境镜像:
version: '3.8' services: # 模型服务 ml-service: build: context: . dockerfile: Dockerfile.prod ports: - "8000:8000" environment: - MODEL_PATH=/app/models/recommender_v2.onnx - FALLBACK_MODEL_PATH=/app/models/recommender_v1.pkl - LOG_LEVEL=INFO volumes: - ./models:/app/models:ro # 只读挂载模型文件 - ./config:/app/config:ro depends_on: - prometheus - grafana # Prometheus监控 prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro # Grafana可视化 grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin volumes: - ./grafana/provisioning:/etc/grafana/provisioning:ro # 模拟上游数据源(可选) mock-api: image: python:3.9-slim command: python -m http.server 8000 --directory ./mock-data ports: - "8001:8000" volumes: - ./mock-data:/app/mock-data:ro配套的Dockerfile.prod强调安全与精简:
# 使用多阶段构建,减小最终镜像 FROM python:3.9-slim AS builder WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt FROM python:3.9-slim WORKDIR /app # 复制依赖和代码 COPY --from=builder /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages COPY . . # 创建非root用户 RUN addgroup -g 1001 -f mlgroup && adduser -S mluser -u 1001 USER mluser # 健康检查 HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/healthz || exit 1 EXPOSE 8000 CMD ["uvicorn", "main:app", "--host", "0.0.0.0:8000", "--port", "8000", "--workers", "4"]本地验证清单(每次提交前必做):
- 启动环境:
docker-compose up -d - 验证健康检查:
curl http://localhost:8000/healthz→ 应返回{"status":"ok", ...} - 验证指标端点:
curl http://localhost:8000/metrics→ 应返回Prometheus格式文本,包含ml_inference_total等指标。 - 发送测试请求:
curl -X POST http://localhost:8000/predict -H "Content-Type: application/json" -d '{"user_id":123,"behaviors":[{"event_type":"click","item_id":456,"timestamp_ms":1672531200000}]}' - 查看Grafana:访问
http://localhost:3000(admin/admin),确认仪表盘显示P99延迟、QPS、错误率。 - 模拟故障:
docker exec -it ml-service bash -c "kill -9 1",观察Prometheus是否在30秒内报警,docker-compose logs ml-service是否显示重启日志。
注意:
docker-compose中的depends_on只控制启动顺序,不保证服务就绪。因此,我们在ml-service的启动脚本中加入wait-for-it.sh,循环检查prometheus:9090可达后再启动Uvicorn。
4.2 CI/CD流水线:GitOps驱动的自动化发布
我们采用GitOps模式,所有生产环境配置(Helm Chart、K8s manifests)都存放在一个独立的infra仓库中,与模型代码仓库分离。CI/CD流水线分为三段:
第一段:模型代码仓库(ml-models)
- 触发:
git push到main分支 - 步骤:
- 运行单元测试(
pytest tests/) - 运行集成测试(
pytest tests/integration/,连接本地Docker Compose的mock-api) - 构建Docker镜像:
docker build -t $REGISTRY/ml-recommender:$GIT_COMMIT . - 推送镜像:
docker push $REGISTRY/ml-recommender:$GIT_COMMIT - 关键一步:生成
model-release.yaml文件,内容为:apiVersion: ml.example.com/v1 kind: ModelRelease metadata: name: recommender-v2 version: "2.1.3" spec: image: $REGISTRY/ml-recommender:$GIT_COMMIT modelPath: "/app/models/recommender_v2.onnx" fallbackModelPath: "/app/models/recommender_v1.pkl" resources: requests: memory: "2Gi" cpu: "1000m" limits: memory: "4Gi" cpu: "2000m" - 将
model-release.yaml推送到infra仓库的staging分支。
- 运行单元测试(
第二段:Infra仓库(infra)的Staging流水线
- 触发:
model-release.yaml被推送到staging分支 - 步骤:
- 使用
helm template渲染Helm Chart,生成staging-manifests.yaml - 运行
kubeval静态检查 - 部署到Staging K8s集群:
helm upgrade --install recommender ./charts/ml-service --namespace staging -f staging-values.yaml - 运行金丝雀测试:向Staging服务发送1000个请求,检查成功率>99.5%,P99延迟<200ms
- 若通过,自动创建Pull Request,将
staging分支的变更合并到production分支。
- 使用
第三段:Infra仓库的Production流水线
- 触发:PR合并到
production分支 - 步骤:
- 渲染Production manifests
kubeval检查- 人工审批门禁:必须由至少两名SRE点击“Approve”
- 部署到Production集群:
helm upgrade --install recommender ./charts/ml-service --namespace production -f prod-values.yaml - 启动蓝绿部署:新版本Pod启动后,流量先切5%,观察10分钟,无异常则切100%
- 发送Slack通知:“recommender-v2.1.3 deployed to production, 100% traffic”
这个流水线的核心价值在于:每一次模型变更,都伴随着一次完整的、可审计的、带门禁的基础设施变更。算法工程师只需关注模型代码,运维工程师只需关注Helm Chart,SRE负责审批,责任清晰,风险可控。
4.3 K8s生产部署:Helm Chart的12个关键配置项
我们的Helm Chart (charts/ml-service) 不是模板,而是经过27次线上迭代的“血泪结晶”。以下是12个必须配置、且每个都有明确理由的关键项:
| 配置项 | 示例值 | 为什么必须配置 | 经验教训 |
|---|---|---|---|
replicaCount | 3 | 避免单点故障。 |
