Spring Integration 教程
一、什么是 Spring Integration
Spring Integration 是 Spring 生态系统中的一个扩展模块,用于实现企业应用集成 (EAI, Enterprise Application Integration)。它基于 Spring 框架,提供了一套声明式的适配器,用于集成不同的系统和服务。
核心特点:
基于消息驱动的架构
支持多种传输协议(HTTP, TCP, JMS, AMQP, FTP, File 等)
提供开箱即用的端点适配器
支持企业集成模式 (EIP, Enterprise Integration Patterns)
二、核心概念
1. Message
// 消息由消息头和消息体组成 public interface Message<T> { T getPayload(); MessageHeaders getHeaders(); } // 创建消息 Message<String> message = MessageBuilder.withPayload("Hello") .setHeader("key", "value") .build();2. Message Channel
消息通道用于在发送者和接收者之间传递消息。
// 点对点通道 @Bean public MessageChannel directChannel() { return new DirectChannel(); } // 发布订阅通道 @Bean public MessageChannel publishSubscribeChannel() { return new PublishSubscribeChannel(); } // 队列通道 @Bean public MessageChannel queueChannel() { return new QueueChannel(10); }3. Message Endpoint
消息端点负责处理消息。
三、快速入门示例
Maven 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <!-- 可选:特定协议支持 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency>基础配置示例
@Configuration @EnableIntegration public class IntegrationConfig { // 定义消息通道 @Bean public MessageChannel inputChannel() { return new DirectChannel(); } @Bean public MessageChannel outputChannel() { return new DirectChannel(); } // 定义集成流程 @Bean public IntegrationFlow simpleFlow() { return IntegrationFlow.from(inputChannel()) .transform(String.class, s -> s.toUpperCase()) .filter(s -> s.startsWith("A")) .handle(System.out::println) .get(); } }使用 @MessagingGateway
// 定义网关接口 @MessagingGateway(defaultRequestChannel = "inputChannel") public interface SimpleGateway { void sendMessage(String message); @Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel") String sendAndReceive(String message); } // 使用网关 @Service public class MessageService { @Autowired private SimpleGateway gateway; public void send(String message) { gateway.sendMessage(message); } }四、常用企业集成模式
1. 消息转换器 (Transformer)
@Bean public IntegrationFlow transformerFlow() { return IntegrationFlow.from("inputChannel") .transform(new GenericTransformer<String, User>() { @Override public User transform(String source) { return new User(source); } }) .channel("outputChannel") .get(); }2. 消息过滤器 (Filter)
@Bean public IntegrationFlow filterFlow() { return IntegrationFlow.from("inputChannel") .filter(payload -> payload instanceof User) .filter("payload.age > 18") // SpEL 表达式 .channel("adultChannel") .get(); }3. 消息路由器 (Router)
@Bean public IntegrationFlow routerFlow() { return IntegrationFlow.from("inputChannel") .route(payload -> { if (payload instanceof Order) return "orderChannel"; if (payload instanceof Payment) return "paymentChannel"; return "errorChannel"; }) .get(); }4. 消息拆分器 (Splitter) 和聚合器 (Aggregator)
@Bean public IntegrationFlow splitterAggregatorFlow() { return IntegrationFlow.from("inputChannel") .split() // 拆分消息 .channel("splitChannel") .aggregate() // 聚合消息 .channel("outputChannel") .get(); }五、常用适配器示例
1. 文件适配器
@Configuration public class FileIntegrationConfig { // 读取文件 @Bean public IntegrationFlow fileReaderFlow() { return IntegrationFlow.from( Files.inboundAdapter(new File("/input")) .patternFilter("*.txt"), e -> e.poller(Pollers.fixedDelay(1000)) ) .transform(File.class, File::getAbsolutePath) .handle(System.out::println) .get(); } // 写入文件 @Bean public IntegrationFlow fileWriterFlow() { return IntegrationFlow.from("fileInputChannel") .handle(Files.outboundAdapter(new File("/output")) .autoCreateDirectory(true)) .get(); } }2. HTTP 适配器
@Configuration public class HttpIntegrationConfig { // HTTP 入站网关 @Bean public IntegrationFlow httpInboundFlow() { return IntegrationFlow.from( Http.inboundGateway("/api/message") .requestMapping(m -> m.methods(HttpMethod.POST)) .requestPayloadType(String.class) .replyTimeout(30000) ) .transform(String.class, s -> "Processed: " + s) .get(); } // HTTP 出站网关 @Bean public IntegrationFlow httpOutboundFlow() { return IntegrationFlow.from("requestChannel") .handle(Http.outboundGateway("https://api.example.com/data") .httpMethod(HttpMethod.GET) .expectedResponseType(String.class)) .channel("responseChannel") .get(); } }3. JMS 适配器
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> </dependency>@Configuration public class JmsIntegrationConfig { @Bean public ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory("tcp://localhost:61616"); } // JMS 入站适配器 @Bean public IntegrationFlow jmsInboundFlow() { return IntegrationFlow.from( Jms.inboundAdapter(connectionFactory()) .destination("queue.in") ) .transform(String.class, String::toUpperCase) .handle(message -> System.out.println("Received: " + message)) .get(); } // JMS 出站适配器 @Bean public IntegrationFlow jmsOutboundFlow() { return IntegrationFlow.from("jmsOutputChannel") .handle(Jms.outboundAdapter(connectionFactory()) .destination("queue.out")) .get(); } }六、高级特性
1. 错误处理
@Bean public IntegrationFlow errorHandlingFlow() { return IntegrationFlow.from("inputChannel") .transform(...) .handle(..., e -> e .advice(ExpressionEvaluatingRequestHandlerAdvice.class) .advice(advice -> advice .onFailureExpression("payload.message") .trapException(true)) ) .get(); } // 全局错误通道 @Bean public IntegrationFlow errorFlow() { return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) .handle(message -> { Exception exception = (Exception) message.getPayload(); log.error("Error: ", exception); }) .get(); }2. 消息历史
@Configuration @EnableIntegration @EnableMessageHistory public class HistoryConfig { @Bean public IntegrationFlow historyFlow() { return IntegrationFlow.from("inputChannel") .transform(...) .enrichHeaders(s -> s.header(MessageHistory.HEADER_NAME, new MessageHistory())) .handle(...) .get(); } }3. 控制总线
@Bean public IntegrationFlow controlBusFlow() { return IntegrationFlow.from("controlBus") .controlBus() .get(); } // 使用控制总线 @Component public class ControlBusService { @Autowired @Qualifier("controlBus") private MessageChannel controlBus; public void stopChannel() { controlBus.send(MessageBuilder.withPayload("@myChannel.stop()").build()); } }七、完整示例:文件处理系统
@SpringBootApplication @EnableIntegration public class FileProcessingApplication { public static void main(String[] args) { SpringApplication.run(FileProcessingApplication.class, args); } } @Configuration public class FileProcessingFlow { private static final Logger log = LoggerFactory.getLogger(FileProcessingFlow.class); // 文件输入目录 @Value("${input.directory:/input}") private String inputDirectory; // 处理成功目录 @Value("${success.directory:/success}") private String successDirectory; // 处理失败目录 @Value("${failed.directory:/failed}") private String failedDirectory; @Bean public IntegrationFlow fileProcessingFlow() { return IntegrationFlow.from( Files.inboundAdapter(new File(inputDirectory)) .patternFilter("*.csv") .preventDuplicates(true) .autoCreateDirectory(true), e -> e.poller(Pollers.fixedDelay(5000) .maxMessagesPerPoll(5) .advice(expressionAdvice())) ) .channel(MessageChannels.queue("processingChannel", 10)) .transform(Files.toStringTransformer()) // 文件转字符串 .split(s -> s.delimiters("\n")) // 按行拆分 .filter(line -> !line.trim().isEmpty()) .transform(line -> parseCsvLine(line)) // 解析CSV .aggregate(aggregatorSpec -> aggregatorSpec .releaseStrategy(new SimpleSequenceSizeReleaseStrategy()) .correlationStrategy(message -> "batch")) .handle(message -> processBatch((List<Map<String, String>>) message.getPayload())) .handle(Files.outboundAdapter(new File(successDirectory)) .autoCreateDirectory(true) .fileNameGenerator(message -> generateFileName(message))) .get(); } // 错误处理:失败的文件移动到失败目录 @Bean public IntegrationFlow errorHandlingFlow() { return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) .handle(message -> { Message<?> failedMessage = (Message<?>) message.getHeaders().get("inputMessage"); File failedFile = (File) failedMessage.getPayload(); FileUtils.moveFileToDirectory(failedFile, new File(failedDirectory), true); log.error("Failed to process file: {}", failedFile.getName()); }) .get(); } private Map<String, String> parseCsvLine(String line) { // CSV解析逻辑 return new HashMap<>(); } private void processBatch(List<Map<String, String>> batch) { // 批量处理逻辑 log.info("Processing batch of {} records", batch.size()); } private String generateFileName(Message<?> message) { return "processed_" + System.currentTimeMillis() + ".json"; } @Bean public Advice expressionAdvice() { return new ExpressionEvaluatingRequestHandlerAdvice(); } }八、最佳实践
合理使用通道类型:DirectChannel 用于同步,QueueChannel 用于缓冲,PublishSubscribeChannel 用于广播
避免阻塞操作:使用 QueueChannel 时注意配置合适的大小和 poller
错误处理:始终配置错误通道,记录异常并适当重试
监控和管理:使用 Spring Boot Actuator 监控集成端点
management: endpoints: web: exposure: include: integration测试:使用 @SpringIntegrationTest 进行集成测试
@SpringBootTest @SpringIntegrationTest(noAutoStartup = {"inputChannel"}) class IntegrationFlowTest { @Test void testFlow() { // 测试逻辑 } }