当前位置: 首页 > news >正文

Spring Integration 教程

一、什么是 Spring Integration

Spring Integration 是 Spring 生态系统中的一个扩展模块,用于实现企业应用集成 (EAI, Enterprise Application Integration)。它基于 Spring 框架,提供了一套声明式的适配器,用于集成不同的系统和服务。

核心特点:

  • 基于消息驱动的架构

  • 支持多种传输协议(HTTP, TCP, JMS, AMQP, FTP, File 等)

  • 提供开箱即用的端点适配器

  • 支持企业集成模式 (EIP, Enterprise Integration Patterns)

二、核心概念

1. Message

// 消息由消息头和消息体组成 public interface Message<T> { T getPayload(); MessageHeaders getHeaders(); } // 创建消息 Message<String> message = MessageBuilder.withPayload("Hello") .setHeader("key", "value") .build();

2. Message Channel

消息通道用于在发送者和接收者之间传递消息。

// 点对点通道 @Bean public MessageChannel directChannel() { return new DirectChannel(); } // 发布订阅通道 @Bean public MessageChannel publishSubscribeChannel() { return new PublishSubscribeChannel(); } // 队列通道 @Bean public MessageChannel queueChannel() { return new QueueChannel(10); }

3. Message Endpoint

消息端点负责处理消息。

三、快速入门示例

Maven 依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <!-- 可选:特定协议支持 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency>

基础配置示例

@Configuration @EnableIntegration public class IntegrationConfig { // 定义消息通道 @Bean public MessageChannel inputChannel() { return new DirectChannel(); } @Bean public MessageChannel outputChannel() { return new DirectChannel(); } // 定义集成流程 @Bean public IntegrationFlow simpleFlow() { return IntegrationFlow.from(inputChannel()) .transform(String.class, s -> s.toUpperCase()) .filter(s -> s.startsWith("A")) .handle(System.out::println) .get(); } }

使用 @MessagingGateway

// 定义网关接口 @MessagingGateway(defaultRequestChannel = "inputChannel") public interface SimpleGateway { void sendMessage(String message); @Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel") String sendAndReceive(String message); } // 使用网关 @Service public class MessageService { @Autowired private SimpleGateway gateway; public void send(String message) { gateway.sendMessage(message); } }

四、常用企业集成模式

1. 消息转换器 (Transformer)

@Bean public IntegrationFlow transformerFlow() { return IntegrationFlow.from("inputChannel") .transform(new GenericTransformer<String, User>() { @Override public User transform(String source) { return new User(source); } }) .channel("outputChannel") .get(); }

2. 消息过滤器 (Filter)

@Bean public IntegrationFlow filterFlow() { return IntegrationFlow.from("inputChannel") .filter(payload -> payload instanceof User) .filter("payload.age > 18") // SpEL 表达式 .channel("adultChannel") .get(); }

3. 消息路由器 (Router)

@Bean public IntegrationFlow routerFlow() { return IntegrationFlow.from("inputChannel") .route(payload -> { if (payload instanceof Order) return "orderChannel"; if (payload instanceof Payment) return "paymentChannel"; return "errorChannel"; }) .get(); }

4. 消息拆分器 (Splitter) 和聚合器 (Aggregator)

@Bean public IntegrationFlow splitterAggregatorFlow() { return IntegrationFlow.from("inputChannel") .split() // 拆分消息 .channel("splitChannel") .aggregate() // 聚合消息 .channel("outputChannel") .get(); }

五、常用适配器示例

1. 文件适配器

@Configuration public class FileIntegrationConfig { // 读取文件 @Bean public IntegrationFlow fileReaderFlow() { return IntegrationFlow.from( Files.inboundAdapter(new File("/input")) .patternFilter("*.txt"), e -> e.poller(Pollers.fixedDelay(1000)) ) .transform(File.class, File::getAbsolutePath) .handle(System.out::println) .get(); } // 写入文件 @Bean public IntegrationFlow fileWriterFlow() { return IntegrationFlow.from("fileInputChannel") .handle(Files.outboundAdapter(new File("/output")) .autoCreateDirectory(true)) .get(); } }

2. HTTP 适配器

@Configuration public class HttpIntegrationConfig { // HTTP 入站网关 @Bean public IntegrationFlow httpInboundFlow() { return IntegrationFlow.from( Http.inboundGateway("/api/message") .requestMapping(m -> m.methods(HttpMethod.POST)) .requestPayloadType(String.class) .replyTimeout(30000) ) .transform(String.class, s -> "Processed: " + s) .get(); } // HTTP 出站网关 @Bean public IntegrationFlow httpOutboundFlow() { return IntegrationFlow.from("requestChannel") .handle(Http.outboundGateway("https://api.example.com/data") .httpMethod(HttpMethod.GET) .expectedResponseType(String.class)) .channel("responseChannel") .get(); } }

3. JMS 适配器

<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> </dependency>
@Configuration public class JmsIntegrationConfig { @Bean public ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory("tcp://localhost:61616"); } // JMS 入站适配器 @Bean public IntegrationFlow jmsInboundFlow() { return IntegrationFlow.from( Jms.inboundAdapter(connectionFactory()) .destination("queue.in") ) .transform(String.class, String::toUpperCase) .handle(message -> System.out.println("Received: " + message)) .get(); } // JMS 出站适配器 @Bean public IntegrationFlow jmsOutboundFlow() { return IntegrationFlow.from("jmsOutputChannel") .handle(Jms.outboundAdapter(connectionFactory()) .destination("queue.out")) .get(); } }

六、高级特性

1. 错误处理

@Bean public IntegrationFlow errorHandlingFlow() { return IntegrationFlow.from("inputChannel") .transform(...) .handle(..., e -> e .advice(ExpressionEvaluatingRequestHandlerAdvice.class) .advice(advice -> advice .onFailureExpression("payload.message") .trapException(true)) ) .get(); } // 全局错误通道 @Bean public IntegrationFlow errorFlow() { return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) .handle(message -> { Exception exception = (Exception) message.getPayload(); log.error("Error: ", exception); }) .get(); }

2. 消息历史

@Configuration @EnableIntegration @EnableMessageHistory public class HistoryConfig { @Bean public IntegrationFlow historyFlow() { return IntegrationFlow.from("inputChannel") .transform(...) .enrichHeaders(s -> s.header(MessageHistory.HEADER_NAME, new MessageHistory())) .handle(...) .get(); } }

3. 控制总线

@Bean public IntegrationFlow controlBusFlow() { return IntegrationFlow.from("controlBus") .controlBus() .get(); } // 使用控制总线 @Component public class ControlBusService { @Autowired @Qualifier("controlBus") private MessageChannel controlBus; public void stopChannel() { controlBus.send(MessageBuilder.withPayload("@myChannel.stop()").build()); } }

七、完整示例:文件处理系统

@SpringBootApplication @EnableIntegration public class FileProcessingApplication { public static void main(String[] args) { SpringApplication.run(FileProcessingApplication.class, args); } } @Configuration public class FileProcessingFlow { private static final Logger log = LoggerFactory.getLogger(FileProcessingFlow.class); // 文件输入目录 @Value("${input.directory:/input}") private String inputDirectory; // 处理成功目录 @Value("${success.directory:/success}") private String successDirectory; // 处理失败目录 @Value("${failed.directory:/failed}") private String failedDirectory; @Bean public IntegrationFlow fileProcessingFlow() { return IntegrationFlow.from( Files.inboundAdapter(new File(inputDirectory)) .patternFilter("*.csv") .preventDuplicates(true) .autoCreateDirectory(true), e -> e.poller(Pollers.fixedDelay(5000) .maxMessagesPerPoll(5) .advice(expressionAdvice())) ) .channel(MessageChannels.queue("processingChannel", 10)) .transform(Files.toStringTransformer()) // 文件转字符串 .split(s -> s.delimiters("\n")) // 按行拆分 .filter(line -> !line.trim().isEmpty()) .transform(line -> parseCsvLine(line)) // 解析CSV .aggregate(aggregatorSpec -> aggregatorSpec .releaseStrategy(new SimpleSequenceSizeReleaseStrategy()) .correlationStrategy(message -> "batch")) .handle(message -> processBatch((List<Map<String, String>>) message.getPayload())) .handle(Files.outboundAdapter(new File(successDirectory)) .autoCreateDirectory(true) .fileNameGenerator(message -> generateFileName(message))) .get(); } // 错误处理:失败的文件移动到失败目录 @Bean public IntegrationFlow errorHandlingFlow() { return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) .handle(message -> { Message<?> failedMessage = (Message<?>) message.getHeaders().get("inputMessage"); File failedFile = (File) failedMessage.getPayload(); FileUtils.moveFileToDirectory(failedFile, new File(failedDirectory), true); log.error("Failed to process file: {}", failedFile.getName()); }) .get(); } private Map<String, String> parseCsvLine(String line) { // CSV解析逻辑 return new HashMap<>(); } private void processBatch(List<Map<String, String>> batch) { // 批量处理逻辑 log.info("Processing batch of {} records", batch.size()); } private String generateFileName(Message<?> message) { return "processed_" + System.currentTimeMillis() + ".json"; } @Bean public Advice expressionAdvice() { return new ExpressionEvaluatingRequestHandlerAdvice(); } }

八、最佳实践

  1. 合理使用通道类型:DirectChannel 用于同步,QueueChannel 用于缓冲,PublishSubscribeChannel 用于广播

  2. 避免阻塞操作:使用 QueueChannel 时注意配置合适的大小和 poller

  3. 错误处理:始终配置错误通道,记录异常并适当重试

  4. 监控和管理:使用 Spring Boot Actuator 监控集成端点

management: endpoints: web: exposure: include: integration
  1. 测试:使用 @SpringIntegrationTest 进行集成测试

@SpringBootTest @SpringIntegrationTest(noAutoStartup = {"inputChannel"}) class IntegrationFlowTest { @Test void testFlow() { // 测试逻辑 } }
http://www.cnnetsun.cn/news/2923411.html

相关文章:

  • Vue项目里定时任务配置太麻烦?试试这两个Cron表达式组件(vue-cron-editor-buefy / vcrontab)
  • 如何用Path of Building PoE2快速规划流放之路2角色:新手的完整实战指南
  • OpenAI这次降价真狠!算笔账:用GPT-3.5-turbo-16k处理长文档,成本到底省了多少?
  • 如何让2008-2017老款Mac焕发新生?OCLP-Mod完整指南助你升级最新macOS
  • 水机制动屏ZDK-15组合电磁空气阀
  • 3D高斯重建终极指南:5步搞定NeRF、MipNeRF360和ScanNet++数据集配置
  • 怎样在手机上免费运行AI模型:Maid项目的终极HuggingFace集成指南
  • Python如何解析非标准JSON:那些坑和解决方案
  • OmenSuperHub终极指南:免费解锁惠普暗影精灵笔记本的完整性能控制
  • 3种AMD处理器深度调试方案:释放Ryzen平台隐藏性能潜力
  • LangChain实战:从零构建一个智能问答机器人,解锁大模型应用新姿势
  • 戴森球计划8000+蓝图实战指南:从零构建高效星际工厂的完整方案
  • RS485 HUB选型避坑指南:从8口分线器到带隔离中继器,怎么选才不翻车?
  • 3个实用技巧彻底解决Edge-TTS语音合成连接与配置问题
  • 5个SillyTavern性能优化技巧:让你的LLM前端响应速度提升300%
  • eLabFTW:实验室数字化的终极解决方案,让科研管理变得简单高效
  • 揭秘Steam挂刀行情站:构建24小时实时市场监控系统的技术架构与实践
  • 联发科设备终极解锁指南:用MTKClient掌控你的设备底层
  • 深度解析Android逆向工程:dex2jar实战技巧与架构揭秘
  • Arduino红外遥控终极指南:Arduino-IRremote库完整使用教程
  • Java代码变更如何精准评估影响范围?揭秘JCCI的智能化分析引擎
  • 从绿幕抠像到AI一键抠图:Image Matting技术简史与主流开源项目盘点
  • 【篮球英语】20 季后赛与总决赛:通向冠军之路
  • 大模型 Prompt 优化思路:解决回答不准、逻辑混乱问题
  • RuoYi-Vue-Plus连接池二选一:放弃Druid改用HikariCP前,你需要知道的几个坑(Java 8兼容性、配置项差异)
  • MPC8260 SCC HDLC模式核心原理、配置与实战调试指南
  • MPC8555E CDS嵌入式开发平台:电源、总线与调试架构深度解析
  • LangChain Tool Calling 原理:模型是怎么决定调用哪个工具的?
  • trace.moe技术解析:基于向量数据库的动漫场景搜索引擎架构
  • 深入解析MPC8306 eSDHC控制器:命令响应、状态监控与中断处理实战