Message Processing With Spring Integration高级应用:自定义消息通道与端点
一、Spring Integration 简介
Spring Integration 是 Spring 框架的扩展,支持企业集成模式(EIP),提供轻量级的消息处理功能,帮助开发者构建可维护、可测试的企业集成解决方案。
核心目标:
- 提供简单的模型来实现复杂的企业集成。
- 支持与外部系统的集成。
- 提供模块化、松耦合的消息处理架构。
二、Spring Integration 核心组件
1. 消息(Message)
- 定义:消息是 Spring Integration 的核心,包含
payload
(负载)和header
(头部)。 - 创建消息:通过
MessageBuilder
创建消息。
代码示例:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
Message<String> message = MessageBuilder
.withPayload("Message Payload")
.setHeader("Message_Header1", "Header1_Value")
.setHeader("Message_Header2", "Header2_Value")
.build();
2. 消息通道(Message Channel)
- 定义:消息通道是消息传递的管道,连接消息的生产者和消费者。
- 类型:
- 点对点(Point-to-Point):每条消息最多被一个消费者接收。
- 发布/订阅(Publish/Subscribe):每条消息可以被多个订阅者接收。
- 常见实现:
DirectChannel
:默认点对点通道。NullChannel
:虚拟通道,用于测试和调试。- 其他:
PublishSubscribeChannel
、QueueChannel
、PriorityChannel
等。
3. 消息端点(Message Endpoint)
消息端点是应用程序代码与消息基础设施之间的桥梁,主要类型包括:
- Transformer:转换消息内容或结构。
- Filter:过滤不符合条件的消息。
- Router:根据条件将消息路由到不同的通道。
- Splitter:将消息拆分为多个子消息。
- Aggregator:将多个消息聚合为一个消息。
- Service Activator:连接服务实例到消息系统。
- Channel Adapter:连接消息通道与外部系统。
三、货物处理系统示例
1. 需求
实现一个货物处理系统,功能包括:
- 接收货物消息。
- 拆分货物列表为单个货物消息。
- 基于重量过滤货物。
- 根据运输类型(国内/国际)路由货物。
- 转换货物消息。
- 最终处理并记录货物信息。
2. 项目环境
- JDK:1.8
- Spring:4.1.2
- Spring Integration:4.1.0
- Maven:3.2.2
- 操作系统:Ubuntu 14.04
3. 完整代码实现
Step 1:添加依赖
在 pom.xml
中添加 Spring 和 Spring Integration 的依赖:
<properties>
<spring.version>4.1.2.RELEASE</spring.version>
<spring.integration.version>4.1.0.RELEASE</spring.integration.version>
</properties>
<dependencies>
<!-- Spring 核心依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- Spring Integration 核心依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>${spring.integration.version}</version>
</dependency>
</dependencies>
Step 2:配置类
创建 AppConfiguration
类,配置消息通道和启用 Spring Integration:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {
@Bean
public MessageChannel cargoGWDefaultRequestChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel cargoSplitterOutputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel cargoFilterOutputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel cargoTransformerOutputChannel() {
return new DirectChannel();
}
}
Step 3:消息网关
定义 CargoGateway
接口,作为消息系统的入口:
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
import java.util.List;
@MessagingGateway
public interface CargoGateway {
@Gateway(requestChannel = "cargoGWDefaultRequestChannel")
void processCargoRequest(Message<List<Cargo>> message);
}
Step 4:消息拆分器
实现 CargoSplitter
,将货物列表拆分为单个货物消息:
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;
import java.util.List;
@MessageEndpoint
public class CargoSplitter {
@Splitter(inputChannel = "cargoGWDefaultRequestChannel", outputChannel = "cargoSplitterOutputChannel")
public List<Cargo> splitCargoList(Message<List<Cargo>> message) {
return message.getPayload();
}
}
Step 5:消息过滤器
实现 CargoFilter
,过滤重量超过限制的货物:
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;
@MessageEndpoint
public class CargoFilter {
private static final double CARGO_WEIGHT_LIMIT = 1000.0;
@Filter(inputChannel = "cargoSplitterOutputChannel", outputChannel = "cargoFilterOutputChannel", discardChannel = "cargoFilterDiscardChannel")
public boolean filterCargo(Cargo cargo) {
return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;
}
}
Step 6:服务激活器
实现 CargoServiceActivator
,处理最终的货物消息:
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;
@MessageEndpoint
public class CargoServiceActivator {
@ServiceActivator(inputChannel = "cargoTransformerOutputChannel")
public void processCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {
System.out.println("Processed Cargo: " + cargo + " in Batch: " + batchId);
}
}
Step 7:运行主程序
创建 Application
类,初始化 Spring 容器并发送货物请求:
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;
import java.util.Arrays;
import java.util.List;
public class Application {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(AppConfiguration.class);
CargoGateway gateway = context.getBean(CargoGateway.class);
List<Cargo> cargos = Arrays.asList(
new Cargo(1, "Receiver1", "Address1", 500, "Domestic"),
new Cargo(2, "Receiver2", "Address2", 1500, "International")
);
gateway.processCargoRequest(MessageBuilder.withPayload(cargos).build());
}
}
四、运行过程
- 启动
Application
类。 - 系统会根据配置:
- 拆分货物列表。
- 过滤重量超过限制的货物。
- 路由货物到不同的通道。
- 最终处理并记录货物信息。
- 控制台输出处理结果。
五、适用场景
Spring Integration 非常适合以下场景:
- 企业系统集成:如 ERP、CRM、供应链系统之间的数据交换。
- 消息驱动架构:如基于事件的微服务通信。
- 复杂消息处理:如批量处理、过滤、路由、转换等。
- 与外部系统交互:如文件系统、消息队列(RabbitMQ、Kafka)、数据库等。
通过 Spring Integration,可以轻松实现复杂的企业集成需求,同时保持代码的可维护性和扩展性。
参考链接:https://dzone.com/articles/message-processing-spring