当前位置: 首页 > news >正文

分布式调度系统 — scheduler-worker执行器详解

一、模块定位与职责

scheduler-worker是分布式任务调度系统中的执行者,负责接收调度中心下发的任务并执行。如果把调度中心比作快递公司的总部,那么 Worker 就是真正的快递员——他们真正干活

Worker 的职责可以概括为四件事:

  1. 连接与注册:启动时主动连接到调度中心,告诉对方“我在这里”

  2. 保持在线:通过心跳机制让调度中心知道它还活着

  3. 执行任务:收到任务后,真正去执行业务逻辑

  4. 缓存加速:通过多级缓存提升数据访问性能


二、启动与连接:Worker 如何上线?

2.1 启动流程

Worker 的启动入口是SchedulerClient.main(),启动过程如下:

1. 创建 Netty 客户端 └── NioEventLoopGroup:处理 I/O 事件 └── Bootstrap:配置连接参数 └── Pipeline 添加 Handler ├── MessageDecoder:字节 → Message ├── MessageEncoder:Message → 字节 └── ClientHandler:业务处理 2. 连接调度中心(127.0.0.1:8080) └── 同步等待连接建立 3. 生成 Worker ID └── "worker-" + System.currentTimeMillis() 4. 发送注册消息(TYPE_REGISTER) └── Body:WorkerInfo JSON(包含 workerId、host、port 等) 5. 启动心跳线程 └── 每 30 秒发送一次心跳 6. 阻塞等待连接关闭

2.2 心跳线程实现

心跳是 Worker 向调度中心证明自己“还活着”的方式:

private void startHeartbeat() { Thread heartbeatThread = new Thread(() -> { while (channel != null && channel.isActive()) { try { Thread.sleep(30000); // 30 秒间隔 channel.writeAndFlush(Message.heartbeat()); System.out.println("Send heartbeat"); } catch (InterruptedException e) { break; } } }); heartbeatThread.setDaemon(true); // 守护线程 heartbeatThread.start(); }

为什么用守护线程?

  • 守护线程不会阻止 JVM 退出

  • 如果 Worker 主线程因为连接断开而退出,心跳线程会自动结束

  • 避免资源泄漏

为什么是 30 秒?

  • 调度中心的心跳超时阈值是 60 秒

  • 30 秒间隔意味着连续丢失 2 次心跳才会被判定为离线

  • 足够容忍网络抖动,又不会浪费太多带宽


三、任务执行入口:ClientHandler

3.1 消息处理流程

ClientHandler是 Worker 处理所有网络消息的地方:

protected void channelRead0(ChannelHandlerContext ctx, Message msg) { switch (msg.getType()) { case TYPE_HEARTBEAT: // 收到心跳响应,打印日志即可 System.out.println("Heartbeat response received"); break; case TYPE_REQUEST: // 1. 解析任务 JobContext job = JsonUtil.fromJson(new String(msg.getBody()), JobContext.class); // 2. 幂等检查 if (executedTasks.contains(job.getTaskId())) { // 已执行过,直接返回成功 return ExecutionResult.success(job.getJobId(), job.getTaskId(), "Already executed"); } // 3. 执行任务 ExecutionResult result = execute(job); // 4. 记录已执行(幂等) if (result.getSuccess()) { executedTasks.add(job.getTaskId()); } // 5. 返回结果 ctx.writeAndFlush(Message.response(JsonUtil.toJson(result).getBytes())); break; case TYPE_CACHE_MIGRATE: // 接收缓存迁移消息 CacheMigrationMessage migrateMsg = JsonUtil.fromJson( new String(msg.getBody()), CacheMigrationMessage.class ); // 预加载热点 Key for (String key : migrateMsg.getHotKeys()) { cacheService.get(key, () -> loadFromDB(key)); } break; } }

3.2 幂等检查:为什么需要它?

在分布式环境中,同一个任务可能被调度中心重复下发:

场景为什么重复
网络超时调度中心没收到响应,认为任务失败,重新下发
调度中心重启从数据库恢复任务时,已执行的任务可能被重新加载
重试机制任务超时后进入重试队列,重新下发

幂等检查的方式

  1. 内存检查executedTasks是一个ConcurrentHashMap.newKeySet(),存储已执行的taskId

  2. 数据库检查task_id有唯一索引,重复插入会失败

两层防护保证同一个任务不会被执行两次。

面试可能问:“如果 Worker 重启了,executedTasks里的数据丢失了怎么办?”

回答:重启后从数据库加载已成功执行的任务 ID,初始化executedTasks。或者依赖数据库唯一索引作为最后一道防线。


四、任务执行:execute()

4.1 执行逻辑

private ExecutionResult execute(JobContext job) { try { // 第一步:分片感知预热 if (job.getPreloadKeys() != null) { for (String key : job.getPreloadKeys()) { cacheService.get(key, () -> { System.out.println("[Preload] Loading from DB: " + key); return "preloaded_data"; }); } System.out.println("[Preload] Preloaded " + job.getPreloadKeys().size() + " keys"); } // 第二步:执行业务逻辑 String userId = job.getParams(); String userData = cacheService.get("user:" + userId, () -> { System.out.println("[DB] Querying database for user: " + userId); return "{\"name\":\"User" + userId + "\",\"level\":1}"; }); System.out.println("Executing job: " + job.getJobName() + ", shard: " + job.getShardingItem()); System.out.println("User data from cache: " + userData); // 模拟耗时操作 Thread.sleep(500); return ExecutionResult.success(job.getJobId(), job.getTaskId(), "Job executed successfully"); } catch (Exception e) { return ExecutionResult.failure(job.getJobId(), job.getTaskId(), "Execution failed: " + e.getMessage()); } }

4.2 执行流程解析

任务到达 │ ▼ ① 分片感知预热 │ 提前加载 preloadKeys 到 L1 缓存 │ 这样后续业务逻辑执行时,数据已经在本地了 ▼ ② 业务逻辑执行 │ 从 params 中提取业务参数 │ 通过多级缓存读取数据(L1 → L2 → DB) ▼ ③ 返回结果 │ 成功 → ExecutionResult.success() │ 失败 → ExecutionResult.failure()

4.3 为什么要有预热步骤?

预热步骤是分片感知预热的体现:

  • 调度中心下发任务时,会携带preloadKeys列表

  • 这些 Key 是调度中心根据分片信息预先计算出来的

  • Worker 在执行任务前先加载这些 Key,让它们进入 L1 缓存

  • 业务逻辑执行时,数据已经在本地了,L1 命中率极高


五、多级缓存:CacheService

5.1 为什么需要多级缓存?

如果所有数据都从数据库读取,性能和并发能力会受限。缓存的本质是用空间换时间

缓存层级技术访问延迟适用场景
L1 本地缓存Caffeine微秒级高频访问的热点数据
L2 分布式缓存Redis毫秒级跨节点共享的数据
DBMySQL数十毫秒所有数据的最终来源

5.2 读取路径(五层防护)

请求 key │ ▼ ① Bloom Filter(防穿透) │ 判断 key 是否可能存在 │ 不存在 → 直接返回 null(不查任何存储) │ ▼ ② Caffeine L1(本地缓存) │ 命中 → 返回数据 │ 未命中 → 继续 │ ▼ ③ 互斥锁 + double-check(防击穿) │ 相同 key 只有一个线程能通过 │ 获取锁后再次检查 L1 │ ▼ ④ Redis L2(分布式缓存) │ 命中 → 回填 L1 → 返回数据 │ 未命中 → 继续 │ ▼ ⑤ DataLoader(数据库) │ 从 DB 加载数据 │ 写入 Redis(随机 TTL 300~360s)+ L1 + Bloom Filter │ 如果 DB 无数据 → 缓存空值 "NULL"(TTL 60s)

5.3 缓存穿透防护:布隆过滤器

问题:查询一个不存在的 key,每次请求都穿透到数据库。

解决方案:布隆过滤器 + 空值缓存

// 初始化布隆过滤器 private final BloomFilter<String> bloomFilter = BloomFilter.create( Funnels.stringFunnel(StandardCharsets.UTF_8), 100000, // 预计插入 10 万条 0.01 // 误判率 1% ); // 查询时 if (!bloomFilter.mightContain(key)) { System.out.println("[Cache] Bloom filter: key not exists, skip"); return null; // 直接返回,不查 DB }

布隆过滤器的特点

  • 判断“不存在”是绝对准确的

  • 判断“存在”可能有误判(1% 的概率)

  • 误判时,空值缓存 "NULL" 作为兜底

5.4 缓存击穿防护:互斥锁

问题:一个热点 Key 过期后,大量请求同时涌入,全部打到数据库。

解决方案:按 key 加互斥锁 + double-check

ReentrantLock lock = keyLocks.computeIfAbsent(key, k -> new ReentrantLock()); lock.lock(); try { // double-check:获取锁后再次检查 L1 String value = localCache.getIfPresent(key); if (value != null) { return value; } // 从 Redis 或 DB 加载 // ... } finally { lock.unlock(); keyLocks.remove(key); }

关键点:只有第一个请求会去加载数据,其他请求等待。数据加载完成后,后续请求直接从缓存读取。

5.5 缓存雪崩防护:随机 TTL

问题:大量 Key 同时过期,导致 DB 压力暴增。

解决方案:在基础过期时间上增加随机偏移

int baseExpire = 300; // 基础 300 秒 int randomOffset = random.nextInt(60); // 0~60 秒随机 int expireTime = baseExpire + randomOffset; jedis.setex(key, expireTime, value); System.out.println("[Cache] Set with expire: " + expireTime + "s");

不同 Key 的过期时间在 300~360 秒之间均匀分布,不会同时过期。


六、缓存迁移接收

当调度中心检测到某个 Worker 下线时,会发送TYPE_CACHE_MIGRATE消息,把该 Worker 的热点 Key 迁移到其他 Worker。

case TYPE_CACHE_MIGRATE: String migrateJson = new String(msg.getBody()); CacheMigrationMessage migrateMsg = JsonUtil.fromJson(migrateJson, CacheMigrationMessage.class); System.out.println("[Migration] Received cache migration, keys: " + migrateMsg.getHotKeys().size()); for (String key : migrateMsg.getHotKeys()) { cacheService.get(key, () -> { System.out.println("[Migration] Loading key: " + key); return "{\"migrated\":true}"; }); } System.out.println("[Migration] Preloaded " + migrateMsg.getHotKeys().size() + " keys"); break;

迁移后:新 Worker 的本地缓存已经包含了这些热点 Key,任务执行时不会因为缓存缺失而变慢。


七、缓存配置:CacheConfig

public class CacheConfig { public static <K, V> Cache<K, V> createLocalCache() { return Caffeine.newBuilder() .maximumSize(10000) // 最大 10000 条 .expireAfterWrite(60, TimeUnit.SECONDS) // 60 秒过期 .recordStats() // 记录命中率 .build(); } }

为什么选择 Caffeine?

对比项CaffeineGuava CacheEhcache
性能最优中等一般
内存管理最优(W-TinyLFU)一般(LRU)一般
Spring 集成原生支持支持支持
维护活跃度

Caffeine 是目前 Java 生态中最优秀的本地缓存库。


八、关键设计决策总结

设计决策原因权衡
Worker 用独立线程发心跳不受业务处理阻塞影响多一个线程
使用 Caffeine 做 L1 缓存微秒级访问,极高 QPS内存占用
布隆过滤器在 Worker 侧避免无效请求穿透到 Redis/DB内存占用 + 误判率
缓存迁移使用同步加载保证迁移完成后再处理任务迁移期间任务可能等待
executedTasksConcurrentHashMap多线程并发处理任务内存占用

九、常见面试问题

Q1:Worker 本地缓存和 Redis 缓存的数据一致性问题怎么处理?

缓存一致性是分布式系统的经典难题。我采用的是最终一致性策略:

  1. 更新时先更新数据库,再删除 Redis 缓存

  2. 本地缓存通过短过期时间(60 秒)来控制不一致窗口

  3. 如果业务对一致性要求高,可以禁用本地缓存

Q2:布隆过滤器的误判率怎么来的?

布隆过滤器使用多个哈希函数。当数据量超过预期时,哈希碰撞概率增加。这就是为什么需要配置expectedInsertionsfpp(误判率)。1% 的误判率意味着 100 个不存在的 Key 中可能有 1 个被误判为存在,然后由空值缓存兜底。

Q3:如果 Worker 在执行任务时宕机了,任务怎么办?

调度中心的心跳检测会感知 Worker 下线,然后在removeWorker方法中把该 Worker 的runningTasks放入重试队列,最终由其他 Worker 接管。

Q4:Worker 收到了 TYPE_CACHE_MIGRATE 消息,但消息中的 HotKey 数量很多,会不会导致大量内存占用?

每次迁移最多 10 个 Key(每个任务预热 10 个 Key),而且只是预热 Key 的元数据,不是预热整个数据内容。实际内存占用很小。


十、三个模块总结

模块核心职责关键技术
scheduler-common通信协议、数据模型、工具Netty 编解码、Jackson
scheduler-core管理 Worker、调度任务、处理故障Netty、一致性 Hash、MySQL
scheduler-worker执行任务、缓存加速Caffeine、Redis、Guava

一句话总结:common 定义“怎么说”,core 决定“谁来做”,worker 负责“怎么做”。

http://www.cnnetsun.cn/news/2996237.html

相关文章:

  • Linux线程3.0-线程同步与互斥,C/C++互斥锁。
  • 大模型微调灾难性遗忘2026:LoRA+SFT+DPO联合缓解的工程方案
  • 增量量距离保护:破解IBR电网继电保护难题的核心技术
  • Spring AI Agent Skills 工程化实践:解耦、契约与可插拔
  • 4sapi工作流引擎:2026生产级Agent的确定性架构实践
  • Vibe Coding:从指令编程到意图驱动的开发范式革命
  • DESIGN.md:从静态文档到可执行契约的工程实践
  • Spring AI Alibaba:Java企业级大模型集成的基础设施协议
  • Vue3+Vite性能优化实战:构建、响应式与加载链路闭环
  • Python3安装后command not found的根因与解决方案
  • Python3环境搭建的底层原理与四条技术路径
  • Burp Suite实战指南:从入门到精通的Web安全测试工具系统学习
  • AI生成代码如何安全落地:工程化落地流水线实践
  • 自动驾驶感知系统实战:多传感器融合与BEV+Occupancy落地
  • vLLM私有部署100倍性能提升的工程实践
  • 截断扩散模型在端到端自动驾驶规划中的工程落地
  • 彻底解决Appium iOS自动化测试WebDriverAgent启动失败Code 65错误
  • Frida在Windows逆向工程中的实战应用:动态插桩与自动化破解
  • 打破功能边界,广凌智慧教学融合平台解决方案实现全场景一体化覆盖
  • 如何获取加密货币的历史K线数据用于回测策略
  • 大模型降本实战:如何利用缓存引擎干掉50%-80%的Token消耗?(附锋范科技API调用示例)
  • GitHub中文界面终极指南:5分钟告别英文困扰,轻松掌握代码管理
  • 高校建设人工智能实验室,到底该如何选择服务商?
  • 王牌操盘手怎么样?一文看懂其运营方法论与行业价值
  • 智能体爆发前夜,为什么说底层平台才是真正的胜负手?
  • 3秒搞定图片格式转换:Chrome扩展神器Save Image as Type使用指南
  • dfs代码问题根源分析
  • TikTok国际版下载避坑指南:2026年最新完整教程
  • 独立产品从0到1:技术人的产品打磨方法论
  • 【共创季稿事节】动图魔方技术拆解 03:HarmonyOS 6.1 本地优先 GIF 工具:素材选择、文件 URI、相册保存与系统分享