SpringBoot 接入 RocketMQ 全教程:Tag 过滤、批量发送、事务消息一站式实现
一、组件说明
RocketMQ 分4个核心角色:
- NameServer:注册中心,Broker路由管理,无状态集群;
- Broker:真正存消息的服务,分Master/Slave;单机部署只用单个Master;
- Producer生产者:发送消息;
- Consumer消费者:订阅Topic、拉取/监听消息。
二、Windows单机部署
2个cmd窗口分别启动
1、官方下载地址
二进制压缩包:
下载地址
Windows选rocketmq-all-x.x.x-bin-release.zip
解压路径无中文、空格,示例:D:\soft\rocketmq-5.3.0
2、修改JVM内存
可选
bin/runserver.cmd
set JAVA_OPT=-Xms128m -Xmx128mbin/runbroker.cmd
set JAVA_OPT=-Xms128m -Xmx128m3、启动NameServer
cd D:\soft\rocketmq-5.3.0\bin start mqnamesrv.cmd默认端口:9876
4、启动Broker
mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=trueautoCreateTopicEnable=true测试环境自动创建Topic,不用手动建。
5、验证安装
工具脚本测试发送/消费
# 发送 tools.cmd org.apache.rocketmq.example.quickstart.Producer # 消费 tools.cmd org.apache.rocketmq.example.quickstart.Consumer三、SpringBoot 接入RocketMQ
Maven依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version></dependency>application.yml 基础配置
rocketmq:name-server:127.0.0.1:9876# 生产者组producer:group:order-producer-group四、生产者发送消息
1)普通同步消息
importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;@ServicepublicclassMqProducerService{@AutowiredprivateRocketMQTemplaterocketMQTemplate;// 目标Topic:order_topicpublicvoidsendNormalMsg(){Stringmsg="订单创建消息,订单号:10001";rocketMQTemplate.syncSend("order_topic",msg);}}2)带Tag过滤消息
消费者可以只订阅指定Tag,实现消息分类过滤
// tag=payrocketMQTemplate.syncSend("order_topic:pay","支付消息");// tag=cancelrocketMQTemplate.syncSend("order_topic:cancel","取消订单消息");3)延迟消息(常用:超时未支付自动关单)
// 延迟等级 1~18,等级3=10srocketMQTemplate.syncSend("order_topic",MessageBuilder.withPayload("超时未支付").build(),3);4)批量发送消息
List<Message<String>>msgList=newArrayList<>();msgList.add(MessageBuilder.withPayload("msg1").build());msgList.add(MessageBuilder.withPayload("msg2").build());rocketMQTemplate.syncSendBatch("order_topic",msgList);五、消费者监听代码
@RocketMQMessageListener
importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.springframework.stereotype.Service;@Service@RocketMQMessageListener(topic="order_topic",consumerGroup="order-consumer-group",// 只消费tag=pay的消息selectorExpression="pay")publicclassOrderConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){System.out.println("收到消息:"+message);// 业务处理:更新订单、扣库存等}}- 方法正常执行完毕:自动ACK确认,Broker删除这条消息;
- 抛出异常:消息重试投递。
六、核心高级场景:事务消息
分布式事务可靠投递
下单+发消息扣库存,保证原子性:
- 半消息发送成功;
- 本地订单事务执行;
- 事务成功:Commit,消息投递给消费者;
- 事务失败:Rollback,消息丢弃。
// 发送事务消息TransactionSendResultresult=rocketMQTemplate.sendMessageInTransaction("tx-producer-group","order_topic",MessageBuilder.withPayload("订单事务消息").build(),null);配套实现RocketMQLocalTransactionListener本地事务执行+回查接口。
七、可视化管理工具
RocketMQ Dashboard
- 下载dashboard jar包,java -jar 启动;
- 配置NameServer地址,浏览器访问;
- 可视化查看Topic、生产者、消费者、堆积消息、重置位点。
八、生产避坑
- 集群部署:NameServer集群无状态;Broker一主多从高可用;
- 消息重试:消费者异常自动重试,重试次数耗尽进入死信队列DLQ;
- 位点重置:可重新消费历史消息;
- 消息堆积排查:消费速度跟不上生产,查看堆积条数、消费耗时;
