当前位置: 首页 > article >正文

Message Processing With Spring Integration高级应用:自定义消息通道与端点

一、Spring Integration 简介

Spring Integration 是 Spring 框架的扩展,支持企业集成模式(EIP),提供轻量级的消息处理功能,帮助开发者构建可维护、可测试的企业集成解决方案。

核心目标:
  1. 提供简单的模型来实现复杂的企业集成。
  2. 支持与外部系统的集成。
  3. 提供模块化、松耦合的消息处理架构。

二、Spring Integration 核心组件

1. 消息(Message)
  • 定义:消息是 Spring Integration 的核心,包含 payload(负载)和 header(头部)。
  • 创建消息:通过 MessageBuilder 创建消息。

代码示例

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

Message<String> message = MessageBuilder
    .withPayload("Message Payload")
    .setHeader("Message_Header1", "Header1_Value")
    .setHeader("Message_Header2", "Header2_Value")
    .build();

2. 消息通道(Message Channel)
  • 定义:消息通道是消息传递的管道,连接消息的生产者和消费者。
  • 类型
    • 点对点(Point-to-Point):每条消息最多被一个消费者接收。
    • 发布/订阅(Publish/Subscribe):每条消息可以被多个订阅者接收。
  • 常见实现
    • DirectChannel:默认点对点通道。
    • NullChannel:虚拟通道,用于测试和调试。
    • 其他:PublishSubscribeChannelQueueChannelPriorityChannel 等。

3. 消息端点(Message Endpoint)

消息端点是应用程序代码与消息基础设施之间的桥梁,主要类型包括:

  • Transformer:转换消息内容或结构。
  • Filter:过滤不符合条件的消息。
  • Router:根据条件将消息路由到不同的通道。
  • Splitter:将消息拆分为多个子消息。
  • Aggregator:将多个消息聚合为一个消息。
  • Service Activator:连接服务实例到消息系统。
  • Channel Adapter:连接消息通道与外部系统。

三、货物处理系统示例

1. 需求

实现一个货物处理系统,功能包括:

  1. 接收货物消息。
  2. 拆分货物列表为单个货物消息。
  3. 基于重量过滤货物。
  4. 根据运输类型(国内/国际)路由货物。
  5. 转换货物消息。
  6. 最终处理并记录货物信息。

2. 项目环境
  • JDK:1.8
  • Spring:4.1.2
  • Spring Integration:4.1.0
  • Maven:3.2.2
  • 操作系统:Ubuntu 14.04

3. 完整代码实现
Step 1:添加依赖

pom.xml 中添加 Spring 和 Spring Integration 的依赖:

<properties>
    <spring.version>4.1.2.RELEASE</spring.version>
    <spring.integration.version>4.1.0.RELEASE</spring.integration.version>
</properties>

<dependencies>
    <!-- Spring 核心依赖 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <!-- Spring Integration 核心依赖 -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>${spring.integration.version}</version>
    </dependency>
</dependencies>

Step 2:配置类

创建 AppConfiguration 类,配置消息通道和启用 Spring Integration:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;

@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {

    @Bean
    public MessageChannel cargoGWDefaultRequestChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel cargoSplitterOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel cargoFilterOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel cargoTransformerOutputChannel() {
        return new DirectChannel();
    }
}

Step 3:消息网关

定义 CargoGateway 接口,作为消息系统的入口:

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

import java.util.List;

@MessagingGateway
public interface CargoGateway {

    @Gateway(requestChannel = "cargoGWDefaultRequestChannel")
    void processCargoRequest(Message<List<Cargo>> message);
}

Step 4:消息拆分器

实现 CargoSplitter,将货物列表拆分为单个货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;

import java.util.List;

@MessageEndpoint
public class CargoSplitter {

    @Splitter(inputChannel = "cargoGWDefaultRequestChannel", outputChannel = "cargoSplitterOutputChannel")
    public List<Cargo> splitCargoList(Message<List<Cargo>> message) {
        return message.getPayload();
    }
}

Step 5:消息过滤器

实现 CargoFilter,过滤重量超过限制的货物:

import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;

@MessageEndpoint
public class CargoFilter {

    private static final double CARGO_WEIGHT_LIMIT = 1000.0;

    @Filter(inputChannel = "cargoSplitterOutputChannel", outputChannel = "cargoFilterOutputChannel", discardChannel = "cargoFilterDiscardChannel")
    public boolean filterCargo(Cargo cargo) {
        return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;
    }
}

Step 6:服务激活器

实现 CargoServiceActivator,处理最终的货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;

@MessageEndpoint
public class CargoServiceActivator {

    @ServiceActivator(inputChannel = "cargoTransformerOutputChannel")
    public void processCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {
        System.out.println("Processed Cargo: " + cargo + " in Batch: " + batchId);
    }
}

Step 7:运行主程序

创建 Application 类,初始化 Spring 容器并发送货物请求:

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.List;

public class Application {
    public static void main(String[] args) {
        ApplicationContext context = new AnnotationConfigApplicationContext(AppConfiguration.class);
        CargoGateway gateway = context.getBean(CargoGateway.class);

        List<Cargo> cargos = Arrays.asList(
            new Cargo(1, "Receiver1", "Address1", 500, "Domestic"),
            new Cargo(2, "Receiver2", "Address2", 1500, "International")
        );

        gateway.processCargoRequest(MessageBuilder.withPayload(cargos).build());
    }
}

四、运行过程

  1. 启动 Application 类。
  2. 系统会根据配置:
    • 拆分货物列表。
    • 过滤重量超过限制的货物。
    • 路由货物到不同的通道。
    • 最终处理并记录货物信息。
  3. 控制台输出处理结果。

五、适用场景

Spring Integration 非常适合以下场景:

  1. 企业系统集成:如 ERP、CRM、供应链系统之间的数据交换。
  2. 消息驱动架构:如基于事件的微服务通信。
  3. 复杂消息处理:如批量处理、过滤、路由、转换等。
  4. 与外部系统交互:如文件系统、消息队列(RabbitMQ、Kafka)、数据库等。

通过 Spring Integration,可以轻松实现复杂的企业集成需求,同时保持代码的可维护性和扩展性。
参考链接:https://dzone.com/articles/message-processing-spring


http://www.kler.cn/a/444515.html

相关文章:

  • 部署开源大模型的硬件配置全面指南
  • 【Apache Doris】周FAQ集锦:第 26 期
  • 武汉市电子信息与通信工程职称公示了
  • python web app开发
  • git使用和gitlab部署
  • python:用 sklearn 构建线性回归模型,并评价
  • 期末复习-数据库原理(全英教材)
  • Oracle 三个生产案例问题分析
  • 5G 模组 RG200U常用AT命令
  • 华为OD E卷(100分)23-连续字母长度
  • 期魔方量化投研平台
  • 嵌入式面试知识点总结 -- 面试篇
  • MySQL简单命令演示
  • Spring框架学习笔记
  • android源码下载
  • Java全栈项目 - 校园招聘信息平台
  • nodejs搭配express网站开发后端接口设计需要注意事项
  • 【go语言】regexp包,正则表达式
  • 解决新版本Spring中lombok失效的问题
  • 与运动序列学习相关的眼动功能连接
  • Docker监控新纪元:Prometheus引领高效容器监控革命
  • 从零开始:PHP基础教程系列-第12篇:数据库基础与PDO使用
  • 魏裕雄的JAVA学习总结
  • SQL 多表查询实战:使用 WHERE 实现表关联
  • 深入了解V-model实现数据双向绑定
  • pdf操作组件aspose的无限制使用方法