当前位置: 首页 > news >正文

PHP数据同步与CDC变更数据捕获

PHP数据同步与CDC变更数据捕获

CDC(Change Data Capture)是一种跟踪数据库变更的技术。PHP可以通过多种方式实现CDC和数据同步。今天说说PHP中CDC的实现方案。

CDC的核心是捕获数据的插入、更新和删除操作,并将变更传播到其他系统。

```php
class ChangeTracker
{
private PDO $pdo;
private string $table;

public function __construct(PDO $pdo, string $table)
{
$this->pdo = $pdo;
$this->table = $table;
$this->initTrackingTable();
}

private function initTrackingTable(): void
{
$this->pdo->exec("
CREATE TABLE IF NOT EXISTS cdc_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
operation ENUM('insert', 'update', 'delete') NOT NULL,
entity_id INT NOT NULL,
old_data JSON,
new_data JSON,
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
INDEX idx_unprocessed (processed, changed_at)
)
");
}

public function logInsert(int $entityId, array $data): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, new_data)
VALUES (?, 'insert', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($data, JSON_UNESCAPED_UNICODE)]);
}

public function logUpdate(int $entityId, array $oldData, array $newData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data, new_data)
VALUES (?, 'update', ?, ?, ?)
");
$stmt->execute([
$this->table,
$entityId,
json_encode($oldData, JSON_UNESCAPED_UNICODE),
json_encode($newData, JSON_UNESCAPED_UNICODE),
]);
}

public function logDelete(int $entityId, array $oldData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data)
VALUES (?, 'delete', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($oldData, JSON_UNESCAPED_UNICODE)]);
}

public function getUnprocessed(int $limit = 100): array
{
$stmt = $this->pdo->prepare("
SELECT * FROM cdc_log
WHERE processed = FALSE
ORDER BY id ASC
LIMIT ?
");
$stmt->execute([$limit]);
return $stmt->fetchAll();
}

public function markProcessed(int $id): void
{
$this->pdo->prepare("UPDATE cdc_log SET processed = TRUE WHERE id = ?")->execute([$id]);
}
}

class UserServiceWithCDC
{
private PDO $pdo;
private ChangeTracker $tracker;

public function __construct(PDO $pdo, ChangeTracker $tracker)
{
$this->pdo = $pdo;
$this->tracker = $tracker;
}

public function createUser(string $name, string $email): int
{
$stmt = $this->pdo->prepare("INSERT INTO users (name, email) VALUES (?, ?)");
$stmt->execute([$name, $email]);
$id = (int)$this->pdo->lastInsertId();

$this->tracker->logInsert($id, ['name' => $name, 'email' => $email]);
return $id;
}

public function updateUser(int $id, array $data): void
{
$oldStmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$oldStmt->execute([$id]);
$oldData = $oldStmt->fetch(PDO::FETCH_ASSOC);

$sets = [];
$params = [];
foreach ($data as $key => $value) {
$sets[] = "{$key} = ?";
$params[] = $value;
}
$params[] = $id;

$sql = "UPDATE users SET " . implode(', ', $sets) . " WHERE id = ?";
$this->pdo->prepare($sql)->execute($params);

$this->tracker->logUpdate($id, $oldData, $data);
}

public function deleteUser(int $id): void
{
$stmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$stmt->execute([$id]);
$oldData = $stmt->fetch(PDO::FETCH_ASSOC);

$this->pdo->prepare("DELETE FROM users WHERE id = ?")->execute([$id]);
$this->tracker->logDelete($id, $oldData);
}
}
?>

CDC消费者将变更同步到其他系统:

```php
class CdcConsumer
{
private ChangeTracker $tracker;
private array $handlers = [];

public function __construct(ChangeTracker $tracker)
{
$this->tracker = $tracker;
}

public function registerHandler(string $operation, callable $handler): void
{
$this->handlers[$operation][] = $handler;
}

public function process(): int
{
$processed = 0;
$changes = $this->tracker->getUnprocessed(50);

foreach ($changes as $change) {
try {
$handlers = $this->handlers[$change['operation']] ?? [];
foreach ($handlers as $handler) {
$handler($change);
}
$this->tracker->markProcessed($change['id']);
$processed++;
} catch (\Exception $e) {
error_log("CDC处理失败: {$e->getMessage()}");
}
}

return $processed;
}
}
?>

CDC是数据同步和事件驱动架构的基础技术。通过捕获数据库变更日志,可以将数据同步到缓存、搜索引擎或数据仓库。CDC的关键是变更的顺序性和幂等性,确保数据最终一致。PHP实现的CDC适合轻量级的同步场景,大规模场景建议使用Debezium等专业工具。

http://www.cnnetsun.cn/news/2713203.html

相关文章:

  • 别再只调参了!深入MAE源码,手把手教你如何将它适配到自己的主干网络(以ResNet为例)
  • 如何快速部署AI编程助手:OpenCode 5分钟配置终极指南
  • 告别云打包!用Android Studio离线打包UniApp APK的保姆级避坑指南
  • Java面试必问的10大核心问题及高分回答技巧
  • 后端开发框架选型指南:SpringBootvsDjango
  • AI语音合成将如何重塑内容产业?:7大颠覆性趋势+3类已验证商业场景(附2025技术成熟度曲线)
  • PS2手柄通信时序详解:为什么你的STM32F407读取会出错?一个延时引发的血案
  • Arduino Leonardo打造LCD倒计时秒表:从状态机到非阻塞延时实战
  • Python+Hadoop+Hive+Spark音乐排行榜数据分析系统源码+论文
  • VoiceFixer:音频增强工具终极指南,一键解决语音质量问题
  • 5步完整方案:Cursor Pro永久免费使用终极指南
  • 从零开始:如何为qBittorrent编写自定义搜索插件
  • 告别Windows编译慢!在Ubuntu 22.04上从源码编译Chrono Engine全模块(含Irrlicht可视化)
  • Arduino倒计时器实战:从硬件连接到状态机编程
  • 别再乱选预处理器了!Stable Diffusion ControlNet Tile模型三大预处理器实战对比(附高清对比图)
  • MiddleClick-Sonoma终极指南:三指点击实现滚轮点击的完整教程
  • 技术驱动财务转型:从流程自动化到智能决策的实战架构
  • ComfyUI-Impact-Pack:发现AI图像增强的无限可能
  • macOS下Claude Code从0到1配置教程(附API密钥获取+常见报错修复)
  • 告别编译焦虑:Ubuntu 22.04下一键式编译Chrono Engine及其Irrlicht可视化模块
  • 模拟电路实战:用晶体管与振动电机打造声控石头昆虫
  • TradingAgents-CN:构建企业级AI投资决策系统的技术实践
  • 保姆级教程:手把手教你用YOLOv8-OBB训练自己的遥感旋转目标检测模型(UCAS-AOD数据集)
  • 从Chatbot到生产级Agent:保姆级开发指南,带你搞定AI Agent工程化难题!
  • [論文學習]大型語言模型(LLM)隱私風險全面調查:訓練與推論階段的挑戰與對策
  • 手把手教你解决Android Studio报错:AGP版本不兼容(实测降级Gradle与插件版本)
  • 展锐平台Sensor Hub驱动添加实战:从源码编译到内存Overlay的完整避坑指南
  • 从王者荣耀卡顿聊起:手把手带你搞懂FPS、码率与视频编码(H.264/H.265实战解析)
  • 终极指南:用Fan Control彻底掌控Windows风扇,告别噪音与过热烦恼
  • 游戏闪退?可能是Vulkan的锅!Windows双显卡(独显+核显)环境下排查与切换Vulkan渲染器的完整指南