当前位置: 首页 > news >正文

SpringBoot 接入 RocketMQ 全教程:Tag 过滤、批量发送、事务消息一站式实现

一、组件说明

RocketMQ 分4个核心角色:

  1. NameServer:注册中心,Broker路由管理,无状态集群;
  2. Broker:真正存消息的服务,分Master/Slave;单机部署只用单个Master;
  3. Producer生产者:发送消息;
  4. Consumer消费者:订阅Topic、拉取/监听消息。

二、Windows单机部署

2个cmd窗口分别启动

1、官方下载地址

二进制压缩包:
下载地址
Windows选rocketmq-all-x.x.x-bin-release.zip

解压路径无中文、空格,示例:D:\soft\rocketmq-5.3.0

2、修改JVM内存

可选

bin/runserver.cmd
set JAVA_OPT=-Xms128m -Xmx128m
bin/runbroker.cmd
set JAVA_OPT=-Xms128m -Xmx128m

3、启动NameServer

cd D:\soft\rocketmq-5.3.0\bin start mqnamesrv.cmd

默认端口:9876

4、启动Broker

mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

autoCreateTopicEnable=true测试环境自动创建Topic,不用手动建。

5、验证安装

工具脚本测试发送/消费

# 发送 tools.cmd org.apache.rocketmq.example.quickstart.Producer # 消费 tools.cmd org.apache.rocketmq.example.quickstart.Consumer

三、SpringBoot 接入RocketMQ

Maven依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version></dependency>

application.yml 基础配置

rocketmq:name-server:127.0.0.1:9876# 生产者组producer:group:order-producer-group

四、生产者发送消息

1)普通同步消息

importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;@ServicepublicclassMqProducerService{@AutowiredprivateRocketMQTemplaterocketMQTemplate;// 目标Topic:order_topicpublicvoidsendNormalMsg(){Stringmsg="订单创建消息,订单号:10001";rocketMQTemplate.syncSend("order_topic",msg);}}

2)带Tag过滤消息

消费者可以只订阅指定Tag,实现消息分类过滤

// tag=payrocketMQTemplate.syncSend("order_topic:pay","支付消息");// tag=cancelrocketMQTemplate.syncSend("order_topic:cancel","取消订单消息");

3)延迟消息(常用:超时未支付自动关单)

// 延迟等级 1~18,等级3=10srocketMQTemplate.syncSend("order_topic",MessageBuilder.withPayload("超时未支付").build(),3);

4)批量发送消息

List<Message<String>>msgList=newArrayList<>();msgList.add(MessageBuilder.withPayload("msg1").build());msgList.add(MessageBuilder.withPayload("msg2").build());rocketMQTemplate.syncSendBatch("order_topic",msgList);

五、消费者监听代码

@RocketMQMessageListener

importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.springframework.stereotype.Service;@Service@RocketMQMessageListener(topic="order_topic",consumerGroup="order-consumer-group",// 只消费tag=pay的消息selectorExpression="pay")publicclassOrderConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){System.out.println("收到消息:"+message);// 业务处理:更新订单、扣库存等}}
  • 方法正常执行完毕:自动ACK确认,Broker删除这条消息;
  • 抛出异常:消息重试投递。

六、核心高级场景:事务消息

分布式事务可靠投递

下单+发消息扣库存,保证原子性:

  1. 半消息发送成功;
  2. 本地订单事务执行;
  3. 事务成功:Commit,消息投递给消费者;
  4. 事务失败:Rollback,消息丢弃。
// 发送事务消息TransactionSendResultresult=rocketMQTemplate.sendMessageInTransaction("tx-producer-group","order_topic",MessageBuilder.withPayload("订单事务消息").build(),null);

配套实现RocketMQLocalTransactionListener本地事务执行+回查接口。

七、可视化管理工具

RocketMQ Dashboard

  1. 下载dashboard jar包,java -jar 启动;
  2. 配置NameServer地址,浏览器访问;
  3. 可视化查看Topic、生产者、消费者、堆积消息、重置位点。

八、生产避坑

  1. 集群部署:NameServer集群无状态;Broker一主多从高可用;
  2. 消息重试:消费者异常自动重试,重试次数耗尽进入死信队列DLQ;
  3. 位点重置:可重新消费历史消息;
  4. 消息堆积排查:消费速度跟不上生产,查看堆积条数、消费耗时;
http://www.cnnetsun.cn/news/2920597.html

相关文章:

  • AI 算法题分类与标签体系:从题目特征到知识点的自动映射
  • MPC823通信处理器模块:BRG与SCC配置原理与实战指南
  • BiliRaffle:2025年最实用的B站动态抽奖工具完整指南
  • 终极指南:5分钟快速将图片转为3D打印模型(免费开源)
  • 每日星座运势1.4.4版:精准查询桃花与每日气运
  • MPC8548E CDS开发系统硬件配置实战指南
  • Shutter Encoder:免费开源视频处理工具的终极完整使用指南
  • 2026年制造业MSA测量系统分析(Measurement System Analysis)标准化…
  • 5步永久解锁IDM完整功能:免费激活Internet Download Manager终极指南
  • 缠论技术分析革命:ChanlunX插件如何让通达信用户实现精准可视化交易
  • PowerPC MPC7450性能监控与动态频率切换实战解析
  • 深入解析PowerPC指令集:从RISC原理到MPC8245实战应用
  • MPC8272处理器外部信号详解:从总线接口到硬件设计实战
  • 终极GTA5线上游戏助手:5个实用功能彻底改变你的游戏体验
  • Pull与Push策略:人机信息交互的平衡艺术
  • Spring Boot 的核心注解 @SpringBootApplication 由哪三个注解组成?
  • 3步实现游戏隐身:Deceive让你掌控自己的在线状态
  • Go 微服务服务治理:从熔断降级到限流自愈的工程实践
  • 【共创季稿事节】鸿蒙ArkTS颜色滤镜实战
  • 113.低配GPU友好!DDPM显存溢出解决+混合精度训练优化方案
  • MPC8272硬件安全引擎:数据包描述符驱动与硬件加速实战解析
  • 语义打标:让非结构化文本进入业务决策的翻译器
  • Notepad--:为什么这款国产跨平台文本编辑器值得你立刻尝试?
  • 在 macOS 上享受完美歌词同步体验:LyricsX 终极指南
  • SAP成本估算CK11N自动化实战:BAPI与BDC两种方案对比与避坑指南
  • MPC7450 AltiVec向量指令与缓存架构深度解析及性能优化实战
  • MPC8544E LBC核心寄存器深度解析:从时序陷阱到性能优化实战
  • 制造业运维AI Agent:基于大模型的设备故障自动排查实战
  • 如何快速掌握Mi-Create:小米智能手表表盘设计的完整指南
  • S8.1价值感知设计——让用户觉得每一分钱都花得值