Python 爬虫监控告警:日志结构化 + 异常告警 + 采集速率实时监控落地
前言
分布式爬虫集群长期 7×24 小时无人值守运行时,容易出现代理大面积失效、目标站点接口改版、网络中断、数据库连接耗尽、消费队列堆积暴涨等隐性故障,传统控制台打印日志无法快速定位问题,等到业务数据大面积缺失才发现故障已经造成大量任务漏爬。爬虫监控系统依托 logging 结构化日志采集、Redis 指标打点统计、异常分级钉钉 / 邮件告警、队列长度实时巡检四大模块,实时统计每秒请求量、成功 / 失败率、队列积压数量,触发阈值自动推送告警消息,实现故障分钟级发现。本文从日志规范化配置、爬虫运行指标埋点、阈值规则配置、多渠道告警封装、定时巡检任务全流程落地,附带可运行工程代码与监控指标阈值参考表,无缝对接前文 aiohttp 分布式爬虫与混合存储架构。
本文所需依赖官方文档链接:
- logging:Python 内置日志库,结构化日志输出
- redis-py:指标计数器与队列数据读取
- requests:调用钉钉机器人 webhook 推送告警
- time:打点计时、周期巡检
一、爬虫监控核心指标分类与阈值参考
1.1 三大类监控指标明细
表格
| 指标分类 | 监控项 | 预警阈值 | 故障阈值 |
|---|---|---|---|
| 业务采集指标 | 爬虫瞬时失败率 | 连续 1min 失败率>20% | 连续 3min 失败率>50% |
| 队列指标 | 待爬任务队列长度 | 队列>5000 | 队列>20000 堆积 |
| 资源指标 | MySQL/Mongo 连接异常 | 单次连接失败 | 连续 5 次连接失败触发告警 |
1.2 告警分级规范
- WARN 预警:达到预警阈值,仅推送提醒,不中断爬虫;
- ERROR 故障告警:触发故障阈值,紧急推送运维处理,可联动临时暂停生产者下发任务。
二、结构化日志标准化配置
2.1 结构化日志作用
摒弃零散非规范 print 输出,日志统一携带时间、爬虫节点名、任务 URL、异常类型、耗时、错误堆栈,落地本地文件 + 可选日志中心,便于后期故障检索溯源。
python
运行
import logging import json from datetime import datetime class SpiderLog: def __init__(self,node_name="consumer_01"): self.node_name = node_name self.logger = logging.getLogger(f"spider_{node_name}") self.logger.setLevel(logging.INFO) # 避免重复添加handler if not self.logger.handlers: fmt = logging.Formatter("%(message)s") file_handler = logging.FileHandler(f"./spider_{node_name}.log",encoding="utf-8") console = logging.StreamHandler() file_handler.setFormatter(fmt) console.setFormatter(fmt) self.logger.addHandler(file_handler) self.logger.addHandler(console) def log_record(self,level:str,url:str,cost:float,err_msg:str=""): """结构化json日志""" log_body = { "time":datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "node":self.node_name, "url":url, "cost_ms":round(cost*1000,2), "level":level, "error":err_msg } if level.upper() == "ERROR": self.logger.error(json.dumps(log_body,ensure_ascii=False)) else: self.logger.info(json.dumps(log_body,ensure_ascii=False)) # 日志实例初始化 spider_log = SpiderLog("consumer_01")2.2 日志埋点接入爬虫
爬虫单次请求结束后自动打点日志,成功传入空 err_msg,异常捕获后写入报错信息,统一日志格式方便脚本解析统计成功率。
三、Redis 打点计数器:实时统计成功 / 失败量
3.1 计数器存储结构
spider:metric:succ:{date}:当日成功采集计数spider:metric:fail:{date}:当日失败采集计数spider:metric:min_fail:每分钟失败临时统计
python
运行
import redis REDIS_CONFIG = {"host":"127.0.0.1","port":6379,"db":0,"decode_responses":True} r = redis.Redis(**REDIS_CONFIG) class MetricCounter: @staticmethod def incr_succ(): today = datetime.now().strftime("%Y%m%d") r.incr(f"spider:metric:succ:{today}") @staticmethod def incr_fail(): today = datetime.now().strftime("%Y%m%d") r.incr(f"spider:metric:fail:{today}") r.incr("spider:metric:min_fail") @staticmethod def get_min_fail_clear(): val = int(r.get("spider:metric:min_fail") or 0) r.set("spider:metric:min_fail",0) return val3.2 爬虫埋点接入
采集成功调用MetricCounter.incr_succ(),异常失败调用MetricCounter.incr_fail(),用于每分钟计算失败率。
四、钉钉机器人告警推送封装
4.1 告警工具类实现
python
运行
import requests class DingAlert: def __init__(self,webhook:str): self.webhook = webhook def send_msg(self,title:str,content:str): data = { "msgtype":"text", "text":{"content":f"【爬虫告警】{title}\n{content}"} } try: requests.post(self.webhook,json=data,timeout=5) except Exception as e: spider_log.log_record("ERROR","alert_push",0,str(e)) # 替换自己的钉钉机器人webhook地址 ding = DingAlert("https://oapi.dingtalk.com/robot/send?access_token=xxx")五、定时巡检监控主程序
每分钟轮询队列长度、失败计数,对比阈值触发分级告警
python
运行
import time def monitor_task(): WAIT_QUEUE_KEY = "spider:task:wait" WARN_QUEUE = 5000 ERR_QUEUE = 20000 WARN_FAIL_RATE = 0.2 ERR_FAIL_RATE = 0.5 per_min_total = 1 # 占位,实际可补充成功计数获取 while True: # 1、读取队列长度 queue_len = r.llen(WAIT_QUEUE_KEY) # 2、获取一分钟失败数并清零 fail_num = MetricCounter.get_min_fail_clear() # 简易失败率计算 fail_rate = fail_num/(per_min_total+fail_num) if (per_min_total+fail_num)!=0 else 0 # 队列告警判断 if queue_len >= ERR_QUEUE: ding.send_msg("任务队列严重堆积",f"待爬队列:{queue_len}条,已触发故障阈值!") elif queue_len >= WARN_QUEUE: ding.send_msg("任务队列预警",f"待爬队列:{queue_len}条,即将超限") # 失败率告警 if fail_rate >= ERR_FAIL_RATE: ding.send_msg("爬虫大面积采集失败",f"近1min失败率{round(fail_rate*100,2)}%,失败条数:{fail_num}") elif fail_rate >= WARN_FAIL_RATE: ding.send_msg("爬虫失败率预警",f"近1min失败率{round(fail_rate*100,2)}%") time.sleep(60) # 后台启动巡检 if __name__ == "__main__": from threading import Thread Thread(target=monitor_task,daemon=True).start() while True: time.sleep(3600)六、数据库异常监控补充
新增 MySQL、Mongo 入库捕获异常,连续失败 5 次触发数据库告警:
python
运行
db_err_count = 0 def db_exception_check(): global db_err_count try: # 测试数据库连通 pass except Exception as e: db_err_count +=1 if db_err_count >=5: ding.send_msg("数据库连接故障","连续5次数据库操作异常,请检查服务") db_err_count = 0七、监控拓展优化方案
7.1 历史指标可视化
每日成功 / 失败计数存入 Redis,配合 Prometheus+Grafana 接入做折线图,直观查看全天爬虫波动;
7.2 多渠道告警拓展
在钉钉基础上拓展邮件、企业微信机器人,重要故障双渠道推送;
7.3 自动止损联动
故障告警触发后,调用生产者接口临时暂停任务入队,避免队列无限堆积浪费代理资源。
八、常见监控故障优化对照表
表格
| 异常现象 | 处理方案 |
|---|---|
| 告警消息刷屏 | 增加告警冷却时间,同一故障 5 分钟内只推送一次 |
| 指标统计不准 | 改用 Redis HASH 分时存储,按秒粒度打点聚合 |
| 节点过多日志杂乱 | 接入 ELK,全节点日志统一汇总检索 |
九、总结
爬虫监控系统由结构化日志、Redis 指标打点、阈值巡检、多渠道告警四部分构成,实现采集成功率、任务队列、数据库资源三大维度全量监控,故障从被动排查变为主动预警。整套监控组件无侵入接入原有分布式爬虫、数据入库代码,线上部署后大幅降低人工值守成本,是工业化爬虫项目上线必备配套模块,后续可基于采集指标反向动态调整协程并发数、代理调度权重,实现爬虫自适应智能调控。
