当前位置: 首页 > news >正文

Easyoole 使用rdkafka 进行kafka的创建topic创建 删除 以及数据发布 订阅

开发环境 Linux

首先我们需要下载安装librdkafka

https://github.com/confluentinc/librdkafka/tags?after=v2.10.0-RC2

tar -zxvf librdkafka-2.7.0.tar.gz cd librdkafka-2.7.0 ./configure make && make install

如何知道我们安装成功了呢

ldconfig -p | grep rdkafka

如果有如下输出 就说明安装成功了 。

接下来 我们安装 rdkafka 的PHP 扩展

https://pecl.php.net/package/rdkafka这里我们选择6.0.0

tar -zxvf rdkafka-6.0.0.tgz cd rdkafka-6.0.0 /usr/php/bin/phpize ./configure --with-php-config=/usr/php/bin/php-config make && make

其中phpize 和 php-config 请修改你自己的PHP 环境。

编译好后,修改php.ini 将rdkafka.so 添加到配置中,重启php 运行如下命令,看看是否扩展生效。

php --ri rdkafka

如果有输出 说明对应的kafka扩展已经安装好了 下面将进行代码层面的编写。

在EasySwooleEvent.php 文件中 加载配置目录 如下图所示:

接下来我们在Config 目录新增一个文件 如下:

Kakfa.php

<?php /** * kafka相关连接配置信息 */ return [ 'kafka' => [ "host" => [ '192.168.1.1:9092', ], "zookeeper"=>[ "192.168.1.1:2181/kafka" ], 'kafka_bin_path'=>'/usr/kafka/bin', //kafka的运行命令 本机有kafka客户端 填写 无 忽略 'is_kafka_client'=>true, //本机有kafka客户端 true 有 false 没有,这里与创建 删除topic 有关 若没有客户端,对应topic 需要提前新建好 ], ];

新增相关的服务 我们在App/Service 下面做相关服务。

新增KafkaConsumerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; class KafkaConsumerService { private $consumer; public function __construct(string $topicName) { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('group.id', 'group_' . $topicName); $conf->set('enable.auto.commit', 'true'); $this->consumer = new KafkaConsumer($conf); $this->consumer->subscribe([$topicName]); } public function listen(callable $callback) { while (true) { $msg = $this->consumer->consume(1000); if (empty($msg)) continue; if ($msg->err === RD_KAFKA_RESP_ERR_NO_ERROR) { $callback($msg->payload); } } } }

KafkaProducerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; class KafkaProducerService { private static $instance = null; private $producer; private $topics = []; private function __construct() { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('queue.buffering.max.ms', 5); $this->producer = new Producer($conf); } public static function getInstance(): self { if (!self::$instance) { self::$instance = new self(); } return self::$instance; } public function send(string $topicName, string $message): bool { if (!isset($this->topics[$topicName])) { $this->topics[$topicName] = $this->producer->newTopic($topicName); } $topic = $this->topics[$topicName]; $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); $result = $this->producer->flush(10000); return $result === RD_KAFKA_RESP_ERR_NO_ERROR; } }

KafkaService.php

<?php /** * Kafka消息服务 * @author 树下水月 * @date 2025年11月27日13:23:23 */ namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; use RdKafka\Admin\TopicSpecification; class KafkaService { /** * 发布数据到kafka * @param string $topic topic信息 * @param array $data 发送数据 数组 * @return bool */ public static function publish(string $topic, array $data): bool { return KafkaProducerService::getInstance()->send( $topic, json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) ); } /** * 订阅kafka数据 * @param string $topic topic信息 * @param callable $callback * @return void */ public static function consume(string $topic, callable $callback) { $consumer = new KafkaConsumerService($topic); $consumer->listen($callback); } /** * 获取kafka 支持啥类型 * @param string $kafkaBinPath * @return string */ private function getKafkaMode(string $kafkaBinPath): string { $help = []; exec("$kafkaBinPath/kafka-topics.sh --help 2>&1", $help); // 新版 Kafka (2.0+) 支持 --bootstrap-server foreach ($help as $line) { if (strpos($line, '--bootstrap-server') !== false) { return 'bootstrap'; } } return 'zookeeper'; } /** * 获取 list 命令 */ private function buildListCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --list"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --list"; } } /** * 获取 create 命令 */ private function buildCreateCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $topic, int $partitions, int $replica, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --create --topic $topic --partitions $partitions --replication-factor $replica"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --create --topic $topic --partitions $partitions --replication-factor $replica"; } } /** * 获取 delete 命令 */ private function buildDeleteCommand(string $kafkaBinPath, string $addr, string $zookeepers, string $topic, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --delete --topic $topic"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeepers --delete --topic $topic"; } } /** * 在 PHP 中通过 exec 创建 Kafka topic * @param string $topicName Topic 名称 * @param int $partitions 分区数 * @param int $replication 副本数 * @param string $kafkaBinPath Kafka bin 目录(包含 kafka-topics.sh) * @return array 返回结果 ['success'=>bool, 'message'=>string] */ public function createKafkaTopic($topicName, $partitions = 1, $replication = 1, $kafka_bootstrap, $zookeepers, $is_kafka_client = false, $kafkaBinPath = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = $kafka_bootstrap; $mode = $this->getKafkaMode($kafkaBinPath); //获取模式 zookeeper bootstrap if ($this->isKafkaTopicExist($topicName, [$addr], [$zookeepers], $kafkaBinPath)) { var_dump("Topic {$topicName} 已存在,跳过创建"); return true; } $cmd = $this->buildCreateCommand($kafkaBinPath, $addr, $zookeepers, $topicName, $partitions, $replication, $mode); //执行创建 topic exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { var_dump("当前系统内没有kafka客户端,跳过topic创建"); return true; //没有客户端 不会创建topic 直接跳过 } } /** * 检查 Topic 是否存在(兼容模式) * @param $topicName 需要检查的topic * @param array $brokers * @param array $zookeeper zookeeper * @param $kafkaBin * @return bool * @throws \Exception */ public function isKafkaTopicExist($topicName, array $brokers, array $zookeeper, $kafkaBin = '/yisa_oe/kafka/bin') { $addr = implode(',', $brokers); $zookeepers = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); //获取模式 $cmd = $this->buildListCommand($kafkaBin, $addr, $zookeepers, $mode); //获取topic 是否存在 exec($cmd . " 2>&1", $output, $returnVar); if ($returnVar !== 0) { throw new \Exception("检查 topic 失败:" . implode("\n", $output)); } return in_array($topicName, $output); } /** * 删除 Topic * @param $topicName 需要删除的topic * @param array $brokers * @param array $zookeeps zookeep 信息 * @param $is_kafka_client 是否有kafka客户端 默认false 无 * @param $kafkaBin kafka 对应的bin执行目录 * @return bool */ public function deleteKafkaTopic($topicName, array $brokers, array $zookeeper, $is_kafka_client = false, $kafkaBin = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = implode(',', $brokers); $zookeeper_str = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); $cmd = $this->buildDeleteCommand($kafkaBin, $addr, $zookeeper_str, $topicName, $mode); exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { return true; } } }

我们新增一个路由 这里直接忽略 我们以Test.php 这个控制器为例吧

<?php /** * 测评回调控制器 * @author liupeng * @email liupenga@yisa.com * @date 2025年11月18日14:22:20 */ namespace App\HttpController\Api; use EasySwoole\Http\AbstractInterface\Controller; use App\Service\KafkaService; use EasySwoole\EasySwoole\Config; use EasySwoole\ORM\Exception\Exception; use EasySwoole\Validate\Validate; use App\Model\CommonModel; use App\Model\CommonOrmModel; use EasySwoole\Validate\Functions\Length; class Evaluation extends Controller { public function onRequest(?string $action): bool { return parent::onRequest($action); // TODO: Change the autogenerated stub } public function __construct() { parent::__construct(); $this->config = Config::getInstance()->getConf('common');//配置变量 } public function tet(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $result = $kafka_service->createKafkaTopic($topicName, $partitions, $replication, $kafka_bootstrap, $zookeeps, $is_kafka_client, $kafka_bin_path); //创建kafka 的topic $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 var_dump(KafkaService::publish($topicName, ['ddd' => 33, 'time' => date('Y-m-d H:i:s')])); //写入kafka数据 }

这个tet方法 是创建了一个topic 名字为test的 如果存在 就跳过 如果不存在,创建,在然后就是KafkaService::publish 推送数据到对应的topic 中。

删除Topic

public function tet1(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 $result = $kafka_service->deleteKafkaTopic('Liupeng', $brokers, $zookeeps_arr, $is_kafka_client, $kafka_bin_path); //删除topic }

订阅某个kafka数据

public function dd() { $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'kkk'; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 //消费kafka数据 一致占用 KafkaService::consume($topicName,function ($msg){ echo "收到". $msg . PHP_EOL; }); }

KafkaService::consume 订阅数据

http://www.cnnetsun.cn/news/137493.html

相关文章:

  • EMS-NT企业微电网能碳管理平台:架构、功能与应用研究
  • 读捍卫隐私10读后总结与感想兼导读
  • OpenAI发布GPT-5.2系列;谷歌推出Gemini Deep Research API:AI领域的最新战况与未来前景
  • 华为云国际站代理商的AS跨境有什么优势呢?
  • NPP 草原:美国中部平原实验牧场(SGS),1939-1990 年,R1
  • CCD相机同步外触发拍照抓拍识别高速脉冲计数器信号采集模块
  • 【网络安全】2025新手如何上手挖漏洞(非常详细)零基础入门到精通,看这篇就够了!
  • BurpSuite渗透测试通关手册,简单几步带你从环境配置到报告生成
  • Python | OpenCV | 图像处理 | 入门实验 | 对比度增强 | 裁剪
  • Apifox:API 接口自动化测试完全指南
  • 正反向代理:网络安全核心技术
  • 别被忽悠了!一文讲透MES管理系统本地部署与SaaS模式的真正底牌
  • 【毕业设计】基于springboot+微信小程序的羽球快讯爱好者平台小程序(源码+文档+远程调试,全bao定制等)
  • 小程序计算机毕设之基于SpringBoot的宠物领养微信小程序基于springboot+微信小程序的宠物领养系统小程序(完整前后端代码+说明文档+LW,调试定制等)
  • 小程序计算机毕设之基于springboot+微信小程序的大学生餐厅点餐系统小程序基于springboot微信小程序的校园食堂订餐服务系统(完整前后端代码+说明文档+LW,调试定制等)
  • 计算机小程序毕设实战-基于springboot+微信小程序的影院售票系统设计与实现基于SpringBoot的电影购票平台微信小程序【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 计算机小程序毕设实战-基于springboot+微信小程序的羽球快讯爱好者平台小程序羽毛球场预定app_羽毛球预约管家【完整源码+LW+部署说明+演示视频,全bao一条龙等】
  • 11、文本与盒子属性的CSS技巧解析
  • 23、WinJS控件样式与样式规则定位指南
  • 27、Windows 8 应用开发中的 SVG 样式设计
  • SAP ABAP拆分交货单数量、批次、存储地点 并过账
  • 基于MPC的智能车运动预测和控制算法 Motion predication; Kinemati...
  • Mathcad的野路子】11kW PFC参数计算书实战拆解
  • STM32学习笔记CAN
  • 搭建你的第一个“私有知识库” (RAG)
  • 13、Unix 系统磁盘管理与安全定位脚本实用指南
  • 15、系统管理脚本实用指南
  • 怎么选一款适合大面积清洁的多功能全自动洗地机呢?
  • 使用matlab编写m脚本,编写无迹卡尔曼滤波算法(UKF)估计电池SOC,注释清晰
  • 教培行业新媒体运营困境凸显!这款软件或成转型制胜法宝?