当前位置: 首页 > news >正文

影刀RPA实战:从零搭建电商数据采集系统

影刀RPA实战:从零搭建电商数据采集系统

作者:林焱|阅读时间:约12分钟|难度:⭐⭐⭐ 实战

这是本文集的收官之作——一个完整的端到端项目实战。我们将从零开始,搭建一套生产级的电商数据采集系统,涵盖采集、清洗、入库、分析、报表、通知全流程。


一、项目背景与需求

1.1 项目背景

某电商运营团队每天需要监控多个平台的竞品价格和库存信息:

  • 淘宝:约500个SKU
  • 京东:约300个SKU
  • 拼多多:约400个SKU

目前由3名运营人员手工操作,每人每天花2小时复制粘贴数据到Excel。

目标:用 RPA 实现全自动采集,释放人力去做更有价值的工作。

1.2 需求清单

需求项说明
多平台采集支持淘宝/京东/拼多多三大平台
增量采集只采集有变化的商品(价格/库存变动)
数据持久化存入 SQLite 数据库,支持历史趋势查询
异常保护单个商品失败不影响整体流程
变动告警价格变动超过阈值时发送钉钉通知
自动报表每天8:30自动生成Excel报表并发邮件
运行日志记录每次运行的详细日志

1.3 技术架构图

┌─────────────────────────────────────────────────────┐ │ 电商数据采集系统架构 │ ├─────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ │ │ │ 定时调度 │ ← 每天7:00触发 │ │ └────┬─────┘ │ │ ↓ │ │ ┌──────────────────────────────────────┐ │ │ │ 主控流程 orchestrator │ │ │ │ │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │淘宝采集 │ │京东采集 │ │拼多多采 │ │ │ │ │ │ taobao │ │ jd │ │ pdd │ │ │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ └─────────────┼─────────────┘ │ │ │ │ ↓ │ │ │ │ ┌─────────────────┐ │ │ [video(video-94eiYjyp-1781858908008)(type-csdn)(url-https://live.csdn.net/v/embed/525000)(image-https://v-blog.csdnimg.cn/asset/23da3fe1f67a47106d725406cfde9a97/cover/Cover0.jpg)(title-拼多多店群自动化上架方案)] │ │ │ 数据清洗去重 │ │ │ │ │ └────────┬────────┘ │ │ │ │ ↓ │ │ │ │ ┌──────────────┼──────────────┐ │ │ │ │ ↓ ↓ ↓ │ │ │ │ ┌──────┐ ┌──────────┐ ┌─────┐ │ │ │ │ │SQLite│ │ 变动检测 │ │报表 │ │ │ │ │ │ 入库 │ │ +告警通知│ │生成 │ │ │ │ │ └──────┘ └────┬─────┘ └──┬──┘ │ │ │ │ ↓ ↓ │ │ │ │ ┌──────────┐ ┌─────┐ │ │ │ │ │ 钉钉通知 │ │邮件 │ │ │ │ │ └──────────┘ └─────┘ │ │ │ └───────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────┘

二、数据库设计

2.1 表结构设计

-- ================================-- 表1:商品主信息表-- ================================CREATETABLEIFNOTEXISTSproducts(idINTEGERPRIMARYKEYAUTOINCREMENT,sku_idTEXTNOTNULLUNIQUE,-- 商品唯一标识nameTEXT,-- 商品名称platformTEXTNOTNULL,-- 平台 (taobao/jd/pdd)categoryTEXT,-- 分类shop_nameTEXT,-- 店铺名称product_urlTEXT,-- 商品链接image_urlTEXT,-- 图片链接is_activeINTEGERDEFAULT1,-- 是否在售(0=下架)created_atDATETIMEDEFAULTCURRENT_TIMESTAMP,updated_atDATETIMEDEFAULTCURRENT_TIMESTAMP);-- ================================-- 表2:价格历史记录表(核心表)-- ================================CREATETABLEIFNOTEXISTSprice_history(idINTEGERPRIMARYKEYAUTOINCREMENT,sku_idTEXTNOTNULL,platformTEXTNOTNULL,priceREAL,-- 当前价格original_priceREAL,-- 原价(如有折扣)stock_statusTEXTDEFAULT'unknown',-- 库存状态(in_stock/out_of_stock)sales_countINTEGERDEFAULT0,-- 销量/评价数collected_atDATETIMEDEFAULTCURRENT_TIMESTAMP,FOREIGNKEY(sku_id)REFERENCESproducts(sku_id),UNIQUE(sku_id,DATE(collected_at))-- 同一商品每天只保留一条);-- 创建索引加速查询CREATEINDEXidx_price_skuONprice_history(sku_id);CREATEINDEXidx_price_dateONprice_history(collected_at);CREATEINDEXidx_price_platformONprice_history(platform);CREATEINDEXidx_products_platformONproducts(platform);CREATEINDEXidx_products_categoryONproducts(category);-- ================================-- 表3:变动告警记录表-- ================================CREATETABLEIFNOTEXISTSalerts(idINTEGERPRIMARYKEYAUTOINCREMENT,sku_idTEXT,alert_typeTEXT,-- 'price_up'/'price_down'/'out_of_stock'/'back_in_stock'old_valueREAL,new_valueREAL,change_pctREAL,-- 变化百分比thresholdREAL,-- 触发的阈值notifiedINTEGERDEFAULT0,-- 是否已通知(0=未通知/1=已通知)notified_atDATETIME,created_atDATETIMEDEFAULTCURRENT_TIMESTAMP);-- ================================-- 表4:运行日志表-- ================================CREATETABLEIFNOTEXISTSrun_logs(idINTEGERPRIMARYKEYAUTOINCREMENT,run_dateDATE,platformTEXT,statusTEXT,-- 'running'/'success'/'failed'/'partial'total_itemsINTEGERDEFAULT0,success_itemsINTEGERDEFAULT0,failed_itemsINTEGERDEFAULT0,new_itemsINTEGERDEFAULT0,-- 新发现的商品数changed_itemsINTEGERDEFAULT0,-- 有变动的商品数duration_secondsREAL,error_msgTEXT,created_atDATETIMEDEFAULTCURRENT_TIMESTAMP);

2.2 设计说明

为什么要这样设计? 1. products 和 price_history 分开: → 商品基本信息不常变(名称/分类/链接) → 价格数据每天都在变化 → 分开后查询更快,存储更高效 2. UNIQUE(sku_id, DATE) 约束: → 同一商品同一天只保留一条最新价格 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55af049b85c648e9a0cf7c3710140d74.png#pic_center) ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/b8b79497c7e44ee7a90608f7e0cd274b.png#pic_center) → 天然实现了增量更新(重复插入会冲突) 3. alerts 表独立出来: → 告警逻辑和数据采集解耦 → 可以单独查看所有历史告警 → 支持告警的"已读/未读"管理

三、核心模块实现

模块1:多平台采集器

=========================================== 子流程:platform_collector 输入参数:platform (平台名),sku_list (商品列表) 输出参数:raw_data (原始采集数据),stats (统计信息) =========================================== 【初始化】 raw_data =[]stats ={"total":len(sku_list),"success":0,"failed":0}【根据平台选择不同的采集策略】if platform == "taobao":# 淘宝采集策略打开("https://www.taobao.com") 登录淘宝账号()For Each sku in sku_list:Try:# 方式A:通过商品链接直接打开详情页打开(sku.url) 等待页面加载(5秒)# 提取信息current_price = 获取元素文本(".Price--priceText") original_price = 获取元素文本(".Price--original") stock_info = 获取元素文本(".StockStatus") record ={"sku_id":sku.id,"name":sku.name,"platform":"taobao","price":提取数字(current_price),"original_price":提取数字(original_price),"stock":"有货" if "有货" in stock_info else "无货","collected_time":当前时间()}raw_data.append(record) stats["success"]+= 1Catch:stats["failed"]+= 1log(f"淘宝采集失败:{sku.name}")# 反爬策略:随机延迟等待(随机2~6秒)elif platform == "jd":# 京东采集策略(类似,适配京东页面结构)打开("https://www.jd.com")...For Each sku in sku_list:Try:# 京东可以用搜索+列表方式批量获取# 或者逐个打开详情页...Catch:stats["failed"]+= 1elif platform == "pdd":# 拼多多采集策略(注意反爬更严格)打开("https://www.pinduoduo.com")...# 拼多多可能需要更长的延迟(5~10秒)# 以及更多的Cookie维护【返回结果】 return raw_data,stats

模块2:数据清洗与入库

=========================================== 子流程:data_pipeline 输入参数:raw_data,platform 输出参数:new_count,changed_count =========================================== new_count = 0 changed_count = 0 alert_candidates =[]# 可能需要告警的商品BEGIN TRANSACTIONFor Each item in raw_data:【Step 1:数据清洗】# 价格校验if item.price <= 0 or item.price > 999999:item.price = None# 无效价格标记为空log(f"异常价格:{item.sku_id}={item.price}")# 名称清洗item.name = 去除首尾空格(item.name)if len(item.name) > 200:item.name = item.name[:200]【Step 2:判断是新增还是已有商品】 existing = SELECT * FROM products WHERE sku_id = item.sku_idif existing 为空:# ===== 新商品 =====INSERT INTO products (sku_id,name,platform,shop_name,product_url) VALUES (item.sku_id,item.name,item.platform,...,...) new_count += 1else:# ===== 已有商品:检查是否有变化 =====last_price = SELECT price FROM price_history WHERE sku_id = item.sku_id ORDER BY collected_at DESC LIMIT 1if last_price 不为空:if last_price != item.price and item.price 不为 None:# 价格变了!计算变化幅度change_pct = (item.price-last_price) / last_price * 100 changed_count += 1if abs(change_pct) >= 10:# 变化超过10%alert_candidates.append({"sku_id":item.sku_id,"type":"price_up" if change_pct>0 else "price_down","old":last_price,"new":item.price,"pct":change_pct})【Step 3:写入价格历史】# 使用 INSERT OR REPLACE 处理同日重复INSERT OR REPLACE INTO price_history (sku_id,platform,price,original_price,stock_status,sales_count) VALUES (item.sku_id,item.platform,item.price,...) COMMIT TRANSACTION# 将告警候选写入alerts表For Each alert in alert_candidates:INSERT INTO alerts (sku_id,alert_type,old_value,new_value,change_pct,threshold) VALUES (alert.sku_id,alert.type,alert.old,alert.new,alert.pct,10.0) return new_count,changed_count

模块3:变动告警

=========================================== 子流程:send_alerts ===========================================# 查询今天未发送的告警pending_alerts = SELECT * FROM alerts WHERE notified = 0 AND DATE(created_at) = DATE('now') ORDER BY ABS(change_pct) DESCif pending_alerts 为空:return# 没有需要告警的# 组装告警消息message_parts =["🔔 **竞品价格变动告警**\n"]message_parts.append(f"⏰ 检测时间:{当前时间}\n") message_parts.append("---\n")For Each alert in pending_alerts:# 查询商品名称product = SELECT name FROM products WHERE sku_id = alert.sku_idif alert.alert_type == "price_up":emoji = "📈" direction = "↑上涨"elif alert.alert_type == "price_down":emoji = "📉" direction = "↓下降"else:emoji = "⚠️" direction = "变动" line = f"{emoji}**{product.name}**\n价格{direction}:¥{alert.old_value}→ ¥{alert.new_value}({alert.change_pct:+.1f}%)\n\n" message_parts.append(line)# 标记为已通知UPDATE alerts SET notified=1,notified_at=CURRENT_TIMESTAMP WHERE id=alert.id# 发送钉钉消息if len(pending_alerts) <= 10:# 少量告警:直接发完整内容dingtalk_send(webhook_url,"\n".join(message_parts))else:# 大量告警:只发摘要summary = f"共{len(pending_alerts)}条价格变动\nTOP5最大变动:\n..." dingtalk_send(webhook_url,summary)

模块4:日报生成

=========================================== 子流程:daily_report_generator =========================================== 【数据准备】# 今日各平台统计today_stats = """ SELECT platform,COUNT(*)as total,SUM(CASE WHEN price IS NOT NULL THEN 1 ELSE 0 END) as has_price,AVG(price) as avg_price,MIN(price) as min_price,MAX(price) as max_price FROM price_history WHERE DATE(collected_at) = DATE('now') GROUP BY platform """# 价格变动汇总(今日 vs 昨日)change_summary = """ SELECT today.platform,today.avg_price as today_avg,yesterday.avg_price as yesterday_avg,CASE WHEN yesterday.avg_price>0 THEN ((today.avg_price-yesterday.avg_price) / yesterday.avg_avg * 100) ELSE NULL END as pct_change FROM (SELECT platform,AVG(price) as avg_price FROM price_history WHERE DATE(collected_at)=DATE('now') GROUP BY platform) today LEFT JOIN (SELECT platform,AVG(price) as avg_price FROM price_history WHERE DATE(collected_at)=DATE('now','-1 day') GROUP BY platform) yesterday ON today.platform = yesterday.platform """# TOP20 最低价商品(可能有促销机会)top_bargains = SELECT p.name,ph.price,p.shop_name,p.product_url FROM price_history ph JOIN products p ON ph.sku_id = p.sku_id WHERE DATE(ph.collected_at) = DATE('now') AND ph.price IS NOT NULL ORDER BY ph.price ASC LIMIT 20 【创建 Excel 报表】 新建Excel "D:/reports/电商竞品监控日报_{日期}.xlsx"Sheet1 "今日概览":┌─────────────────────────────────────────────┐ │ 📊 电商竞品监控日报 │ │ 日期:2026年6月10日 星期三 │ │ 生成时间:08:30 │ ├─────────────────────────────────────────────┤ │ │ │ 【各平台数据概览】 │ │ 平台|采集量|有价格|平均价|最高|最低│ │ 淘宝|485|472|¥189|¥2999|¥9.9│ │ 京东|298|295|¥256|¥5999|¥19 │ │ 拼多多|392|388|¥67|¥1599|¥5.5 │ │ 合计|**1175**|**1155**│---|---|---│ ├─────────────────────────────────────────────┤ │ 📈 价格变动:涨 23 个 / 跌 15 个 / 新品 8 个 │ │ ⚠️ 库存预警:3个商品显示缺货 │ └─────────────────────────────────────────────┘Sheet2 "全部数据":→ 导出今日全部价格记录(供深度分析用)Sheet3 "变动明细":→ 列出所有价格变动的商品及变化幅度Sheet4 "最低价TOP20":→ 列出最值得关注的低价/促销商品 + 条件格式: → 涨价的标红色 → 降价的标绿色 → 缺货的标灰色 + 插入图表: → 各平台平均价格对比柱状图 → 近7天价格趋势折线图 保存Excel 【发送邮件】 send_email( to =["boss@co.com","ops@co.com"],subject = "电商竞品监控日报-{今天}",body = "各位好,附件为今日竞品价格监控报告。共发现{changed_count}个价格变动...",attachment = "日报文件路径" )

四、主控流程组装

=========================================== 主流程:ecommerce_monitor_system 调度时间:每天 07:00 开始执行 ========================================== start_time = 当前时间() log("="*60)log(f"开始执行|{start_time}") log("="*60)【Phase 1:采集各平台数据】 all_raw_data ={}all_stats ={}# 依次采集三个平台For Each platform in["taobao","jd","pdd"]:log(f"--- 开始采集:{platform}---")# 获取该平台的SKU列表(从配置文件或数据库读取)sku_list = load_sku_config(platform)# 调用采集子流程raw_data,stats = platform_collector(platform,sku_list) all_raw_data[platform]= raw_data all_stats[platform]= stats log(f"{platform}完成:成功{stats['success']},失败{stats['failed']}")【Phase 2:合并并清洗入库】 total_new = 0 total_changed = 0 For Each platform in["taobao","jd","pdd"]:new_cnt,changed_cnt = data_pipeline(all_raw_data[platform],platform) total_new += new_cnt total_changed += changed_cntlog(f"入库完成:新增{total_new},变动{total_changed}")【Phase 3:发送告警】 send_alerts()【Phase 4:生成并发送日报】 daily_report_generator()【Phase 5:记录运行日志】 end_time = 当前时间() duration = (end_time-start_time).总秒数 total_success = sum(s["success"]for s in all_stats.values()) total_failed = sum(s["failed"]for s in all_stats.values()) INSERT INTO run_logs ( run_date,status,total_items,success_items,failed_items,new_items,changed_items,duration_seconds ) VALUES ( DATE('now'),'success' if total_failed == 0 else 'partial',total_success + total_failed,total_success,total_failed,total_new,total_changed,duration ) log("="*60)log(f"执行完毕|耗时{duration:.1f}|成功{total_success}失败{total_failed}") log("="*60)

五、部署与运维

5.1 配置文件管理

# config.yaml — 所有可配置参数集中管理database:path:"D:/rpa_data/ecommerce.db"platforms:taobao:enabled:trueaccount:"your_taobao_account"delay_range:[3,8]# 秒jd:enabled:trueaccount:"your_jd_account"delay_range:[2,6]pdd:enabled:trueaccount:"your_pdd_account"delay_range:[5,12]# 拼多多反爬严格,延迟更长![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/8ba5a2ccf1c34ef4baffb500e0186c9a.png#pic_center)alerts:price_change_threshold:10# 百分比dingtalk_webhook:"https://oapi.dingtalk.com/robot/send?access_token=xxx"email:smtp:"smtp.qq.com:465"sender:"robot@company.com"password:"授权码"recipients:["boss@company.com","ops@company.com"]report:output_dir:"D:/reports/"retain_days:90# 保留最近90天的报表

5.2 定时任务配置

任务名称:电商监控系统 Cron表达式:0 7 * * * (每天早上7点整) 超时设置:60分钟(3个平台全采完通常需要30~45分钟) 重试策略: 整体失败时:次日正常执行即可(不需要立即重试) 单平台失败:不影响其他平台继续运行 告警规则: 连续2天执行失败 → 电话通知运维 单次成功率 < 80% → 邮件通知负责人 总耗时超过50分钟 → 钉钉群提醒性能退化

5.3 监控面板关键指标

TEMU店群如何管理运营?

╔══════════════════════════════════════╗ ║ 🖥️ 电商监控系统看板 ║ ╠══════════════════════════════════════╣ ║ ║ ║ 📊 今日运行状态 ║ ║ ┌──────────────────────────────┐ ║ ║ │ ████████████████████░░░░ 85% │ ║ ║ │ 成功率 85% (1000/1175) │ ║ ║ └──────────────────────────────┘ ║ ║ ║ ║ 📈 各平台情况 ║ ║ 淘宝: ✅ 485/500 (97%) 耗时12min ║ ║ 京东: ✅ 298/300 (99%) 耗时8min ║ ║ 拼多多: ⚠️ 392/420 (93%) 耗时18min ║ ║ ║ ║ 🔔 待处理告警:5条 ║ ║ ┌─ 3条涨价 / 2条降价 ║ ║ ║ ║ 📅 连续运行天数:28天 ✅ ║ ║ 上次故障:6月5日(网络超时)已恢复 ║ ║ ║ ╚══════════════════════════════════════╝

六、项目总结

6.1 本项目覆盖的全部技能点

技能矩阵: 基础操作: ✅ 打开网页 / 元素捕获 / 点击 / 输入文本 / 获取文本 数据处理: ✅ 循环遍历 / For Each / 列表操作 / 字典操作 ✅ 字符串处理 / 正则提取数字 / 类型转换 数据库: ✅ SQLite 建表 / CRUD / 事务 / 批量插入 ✅ SQL聚合查询 / JOIN / 子查询 异常处理: ✅ Try-Catch / 重试机制 / 日志记录 / 降级方案 办公自动化: ✅ Excel 创建/写入/样式/图表/条件格式 ✅ 邮件发送(HTML)/附件管理 通信通知: ✅ 钉钉Webhook推送 / Markdown格式消息 工程能力: ✅ 子流程模块化 / 参数传递 / 配置文件分离 ✅ 定时调度 / 运行日志 / 监控告警

6.2 项目扩展方向

当前版本可以继续演进的方向: 1. 🤖 AI 增强 用大模型自动分析价格变动原因 生成自然语言的每日简报摘要 2. 📱 可视化大屏 搭建 Web 仪表盘实时展示数据 支持 PC/手机随时查看 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/ebab4a43d5d941bfbece790e5b54dfbc.png#pic_center) 3. 🔄 API 开放 提供 REST API 给其他系统调用 对接企业内部 ERP/OA 系统 4. 📈 预测模型 基于历史数据预测未来价格走势 为采购决策提供数据支撑

七、全文系列回顾

恭喜你完成了本篇收官之作!至此,你已经拥有了一套完整的影刀RPA知识体系

从入门到精通的学习路径: Day 1-2: 零基础入门 → 第一个自动化流程 ✅ Day 3-5: 元素捕获 → 子流程 → 异常处理 ✅ Day 6-8: 数据采集 → Excel操作 → 数据库 ✅ Day 9-10: 定时任务 → 邮件自动化 → 调试技巧 ✅ Day 11+: 综合项目实战 → 你在这里 ✨

下一步?把这套系统真正跑起来吧!

从本文的项目中选取适合你的部分开始实施——哪怕只是先做一个「单平台+单功能」的最小版本。RPA 的精髓不在于学了多少理论,而在于动手做出第一个能跑的生产级流程。

💡最后一句忠告:先让流程跑通,再让它跑快,最后让它跑稳。不要一开始就追求完美——完美是在迭代中产生的。

http://www.cnnetsun.cn/news/2967497.html

相关文章:

  • Umi-OCR:从零部署到高效识别的离线OCR解决方案实践指南
  • 从零开始备战Java面试:这10个高频问题你必须会!
  • 1. 拆解循环神经网络的最小单元:从零理解RNNCell
  • 基于Hadoop大数据技术的电影推荐系统的设计与实现-spider3(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码
  • AI Act合规实战指南:从高风险判定到代码级落地
  • 生产级多维聚合:pandas中滚动计算、自定义指标与报表生成实战
  • CSV解析实战:从RFC标准到生产级健壮读取
  • 破除‘正确概率’幻觉:数据科学中的认知边界与工程实践
  • 机器学习数据划分不是固定比例,而是业务驱动的量化决策
  • MPC8240调试功能深度解析:从总线属性信号到JTAG实战
  • AI大模型benchmark解密:MMLU、GPQA、BBH等五大评测原理与实战解读
  • 语义分割实战避坑指南:从逐像素分类到边缘部署
  • Dify插件生态集:重塑AI应用开发的技术范式革新
  • YOLO26在AzureML的生产级落地:MLOps工程实践指南
  • 【信息科学与工程学】计算机科学与自动化——第三百零五篇 数据中心 Scale-Up、Scale-Out、Scale-Across 16
  • 实时屏幕标注工具LiveDraw:如何在动态演示中实现真正的手写自由?
  • 构建企业级文档智能检索系统的5步架构设计实战指南
  • 5个技巧快速掌握jExifToolGUI:轻松管理照片元数据的完整指南
  • Space Thumbnails:Windows资源管理器3D模型预览终极指南,轻松实现文件可视化
  • Apollo配置中心:从核心原理到生产实践深度解析
  • Gemini原生多模态架构深度解析:从token设计到产业落地
  • 企业级应用文件上传漏洞深度剖析:从原理到防御实战
  • XSS漏洞攻防全解析:从原理到实战的Web安全必修课
  • DeepSeek-V2与R1模型技术解析及推理优化实践
  • FreeRTOS信号量实战:从二进制到计数的场景化应用指南
  • LRS2数据集预处理实战:从下载到人脸与音频特征提取
  • 3分钟极速美化Obsidian:CSS片段与主题资源一站式获取指南
  • 构建智能语义搜索:3步打造你的CLIP跨模态检索系统
  • 从IONOS钓鱼事件看邮件安全:多维度检测模型与防御实践
  • MPC555/556 PowerPC微控制器架构解析与嵌入式开发实战指南