当前位置: 首页 > news >正文

【RabbitMQ】RPC 通信(使用案例)

文章目录

  • 1. RPC 通信
  • 2. 引入依赖
  • 3. 客户端代码编写
    • 3.1 声明队列
    • 3.2 发送请求消息
    • 3.3 使用阻塞队列存储回调结果
    • 3.4 获取回调结果
    • 3.5 完整代码
  • 4. 服务端代码编写
    • 4.1 声明队列
    • 4.2 设置同时最多只能获取一个消息
    • 4.3 接收消息并做出相应处理
    • 4.4 完整代码
  • 5. 运行程序

1. RPC 通信

RPC(Remote Procedure Call),即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。类似于 HTTP 远程调用。

RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程。

大概流程如下:

  • 1、客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了一个回调队列,服务端处理后,会把响应结果发送到这个队列。
  • 2、服务端接收到请求后,处理请求并发送响应消息到 replyTo 指定的回调队列。
  • 3、客户端在回调队列上等待响应消息。一旦收到响应,客户端会检查消息的 correlationId 属性,以确保它是所期望的响应。

接下来我们看看 RPC 模式的实现步骤:

  • 1、引入依赖
  • 2、编写客户端
  • 3、编写服务端

2. 引入依赖

先引入 rabbitmq 的依赖

<!-- Source: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version><scope>compile</scope></dependency>

那么先去 Constants.java 里面定义队列。

// RPC模式publicstaticfinalStringRPC_REQUEST_QUEUE="rpc.request.queue";publicstaticfinalStringRPC_RESPONSE_QUEUE="rpc.response.queue";

3. 客户端代码编写

客户端代码主要流程如下:

  • 1、声明两个队列,包含回调队列 replyQueueName,声明本次请求的唯一标志 corrId。
  • 2、将 replyQueueName 和 corrId 配置到要发送的消息队列中。
  • 3、使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中。
  • 4、阻塞队列有消息后,主线程被唤醒,打印返回内容。

3.1 声明队列

代码如下所示:

channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);

3.2 发送请求消息

代码如下所示:

// 3. 发生请求Stringmsg="hello rpc......";// 设置请求的唯一标识StringcorrelationId=UUID.randomUUID().toString();// 设置请求的相关属性AMQP.BasicPropertiesprops=newAMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());

如下所示:

3.3 使用阻塞队列存储回调结果

代码如下所示:

// 4. 接收响应// 使用阻塞队列来存储响应信息(其实就是等待响应完成)finalBlockingQueue<String>msgQueue=newArrayBlockingQueue<>(1);DefaultConsumerconsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{StringrespMsg=newString(body);System.out.println("接收到回调消息: "+respMsg);if(correlationId.equals(properties.getCorrelationId())){// 如果correlationId校验一致, 说明就是我们想要的响应msgQueue.offer(respMsg);}}};

3.4 获取回调结果

代码如下所示:

Stringresult=msgQueue.take();System.out.println("[RPC Client 响应结果]: "+result);

3.5 完整代码

代码如下所示:

packagerpc;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.TimeoutException;/* rpc 客户端 1. 发生请求 2. 接收响应 */publicclassRpcClient{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,InterruptedException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 发生请求Stringmsg="hello rpc......";// 设置请求的唯一标识StringcorrelationId=UUID.randomUUID().toString();// 设置请求的相关属性AMQP.BasicPropertiesprops=newAMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());// 4. 接收响应// 使用阻塞队列来存储响应信息(其实就是等待响应完成)finalBlockingQueue<String>msgQueue=newArrayBlockingQueue<>(1);DefaultConsumerconsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{StringrespMsg=newString(body);System.out.println("接收到回调消息: "+respMsg);if(correlationId.equals(properties.getCorrelationId())){// 如果correlationId校验一致, 说明就是我们想要的响应msgQueue.offer(respMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);Stringresult=msgQueue.take();System.out.println("[RPC Client 响应结果]: "+result);}}

4. 服务端代码编写

服务端代码主要流程如下:

  • 1、接收消息。
  • 2、根据消息内容进行响应处理,把应答结果返回到回调队列中。

4.1 声明队列

代码如下所示:

channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);

4.2 设置同时最多只能获取一个消息

如果不设置 basicQos,RabbitMQ 会使用默认的 QoS 设置,其 prefetchCount 默认值为 0。当 prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者。这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动。

在 RPC 模式下,通常期望的是一对一的消息处理,即一个请求对应一个响应。消费者在处理完一个消息并确认之后,才会接收到下一条消息。

代码如下所示:

channel.basicQos(1);System.out.println("Awaiting RPC request...");

4.3 接收消息并做出相应处理

代码如下所示:

DefaultConsumerconsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{Stringrequest=newString(body,"UTF-8");System.out.println("接收到请求: "+request);Stringresponse="针对request: "+request+", 响应成功";AMQP.BasicPropertiesbasicProperties=newAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);

RabbitMQ 消息确定机制:在RabbitMQ中,basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息队列确认消息。

自动确认(autoAck=true):消息队列在将消息发送给消费者后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费。

手动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会被意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景。

4.4 完整代码

代码如下所示:

packagerpc;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/* * RPC 服务端 * 1. 接收请求 * 2. 发生响应 */publicclassRpcServer{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactory=newConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnection=factory.newConnection();// 2. 开启 channel 通道Channelchannel=connection.createChannel();// 3. 接收请求channel.basicQos(1);System.out.println("Awaiting RPC request...");DefaultConsumerconsumer=newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{Stringrequest=newString(body,"UTF-8");System.out.println("接收到请求: "+request);Stringresponse="针对request: "+request+", 响应成功";AMQP.BasicPropertiesbasicProperties=newAMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);}}

5. 运行程序

先运行客户端代码:可以看到在 request_queue 中有一条消息(它就相当于是生产者)

点击去,然后可以看到详细消息:

另外可以看到在 response_queue 中有一个消费者(它就相当于是消费者)

然后再运行服务端代码:可以看到已经接收到请求

然后再回到客户端,可以看到已经收到响应了

http://www.cnnetsun.cn/news/2137100.html

相关文章:

  • 保姆级视频教程| 空间转录组分析手册(基于Seurat)
  • 如何通过Win11Debloat优化Windows系统:解决预装软件与隐私问题的完整方案
  • 依托以太网模块实现S7-200 PLC远程诊断与程序上下载
  • 拆解UCIe软件栈:如何复用PCIe/CXL生态实现Chiplet即插即用
  • 告别复制粘贴!用Keil5为GD32F4xx搭建标准工程模板的保姆级流程
  • Halcon 23.05实战:从安装到第一个Qt+VS2022混合项目(解决中文界面与库依赖)
  • Mac新手必看:保姆级Git+SourceTree配置指南,从SSH密钥到拉取代码一气呵成
  • Java医疗HIS/EMR系统等保四级改造避坑手册(含等保测评现场答辩话术+渗透测试防御点位图)
  • 麒麟V10生产环境WordPress部署与分布式迁移完全指南
  • 别让偏见毁了你的AI产品:从亚马逊招聘工具翻车,到用IBM AIF360和Google What-If Tool给你的模型做个‘公平性体检’
  • 智能运维+多模型服务能力,阿里云 RDS AI 助手旗舰版正式上线!
  • 改进YOLOv10:结合HRFPN高分辨率网络实现细节保留,涨点明显!
  • 2025届学术党必备的降重复率工具实际效果
  • 从剪映、即梦 AI 被罚,读懂 AI 生成内容标识硬性合规要求
  • 让你的键盘和鼠标操作变得有趣:BongoCat桌面互动猫咪指南
  • 六个典型热门AI记忆架构对比:Mem0,Letta,MemoryLake,ZenBrain,MIA,MSA 助你快速选型
  • 小米开源MiMo-V2.5和Pro模型:高效、低成本,赋能商业级AI应用!
  • TVA在PCB线路板制造与检测中的创新应用(10)
  • OpenModScan:免费开源的Modbus调试神器,5大核心优势让你轻松搞定工业通信
  • OpenClaw执行奇点——因果链折叠与责任悬置的时间哲学(第十九篇)
  • OpCore Simplify:智能配置黑苹果的终极解决方案
  • Vue2项目实战:如何给你的原生下拉框加上‘模糊搜索’和‘多选标签’功能(附完整代码)
  • 2026届最火的六大AI辅助论文助手实测分析
  • CSS怎样调整弹性项目排列顺序_使用order属性轻松控制DOM显示顺序
  • 日记 3.0:我用 Hermes+Obsidian,把流水账日记变成洞察与成长的飞轮,基于 Karpathy 日记法演进
  • 蓝牙中baseband和RF的关系
  • WASM二进制加载失败?揭秘Docker BuildKit对.wasm文件MIME类型误判机制(附patched builder镜像下载链接)
  • 如何3分钟免费激活Windows与Office:KMS_VL_ALL_AIO智能激活工具完整指南
  • 【优化调度】基于matlab含氢气氨气综合能源系统优化调度【含Matlab源码 15394期】
  • OpenAI向全云厂商开放:与微软七年独家协议终结,这对中国AI意味着什么?