Spring Integration 简介

1、简介

本文将通过一些实际的案例,带你了解 Spring Integration 的核心概念。

Spring Integration 提供了许多功能强大的组件,可大大增强企业架构内系统和流程的互联性。

2、设置

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.1.13.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>5.1.13.RELEASE</version>
</dependency>

你可以从 Maven Central 下载最新版本的 Spring Integration CoreSpring Integration File Support

3、消息传递模式

这个库中的一个基本模式是消息传递(Messaging)。该模式围绕消息展开,消息是离散的数据负载(Discrete Payloads),通过预定义的通道从源系统或进程传递到一个或多个系统或进程。

从历史上看,这种模式是整合多个不同系统的最灵活方式,它可以:

  • 几乎完全解耦集成所涉及的系统
  • 允许集成中的参与系统完全不考虑彼此的底层协议、格式或其他实现细节
  • 鼓励开发和重复使用参与集成的组件

4、消息集成的实际应用

举一个基本例子,将 MPEG 视频文件从指定文件夹复制到另一个配置文件夹:

@Configuration
@EnableIntegration
public class BasicIntegrationConfig{
    public String INPUT_DIR = "the_source_dir";
    public String OUTPUT_DIR = "the_dest_dir";
    public String FILE_PATTERN = "*.mpeg";

    @Bean
    public MessageChannel fileChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource sourceReader= new FileReadingMessageSource();
        sourceReader.setDirectory(new File(INPUT_DIR));
        sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
        return sourceReader;
    }

    @Bean
    @ServiceActivator(inputChannel= "fileChannel")
    public MessageHandler fileWritingMessageHandler() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
        handler.setFileExistsMode(FileExistsMode.REPLACE);
        handler.setExpectReply(false);
        return handler;
    }
}

上面的代码配置了一个 ServiceActivator(服务激活器),一个 MessageChannel(集成通道)和一个 InboundChannelAdapter(入站通道适配器)。

稍后将详细介绍每种组件类型。@EnableIntegration 注解将该类指定为 Spring Integration 配置。

启动 Spring Integration 应用上下文:

public static void main(String... args) {
    AbstractApplicationContext context 
      = new AnnotationConfigApplicationContext(BasicIntegrationConfig.class);
    context.registerShutdownHook();
    
    Scanner scanner = new Scanner(System.in);
    System.out.print("Please enter q and press <enter> to exit the program: ");
    
    while (true) {
       String input = scanner.nextLine();
       if("q".equals(input.trim())) {
          break;
      }
    }
    System.exit(0);
}

上述 main 方法启动了 Integration Context;它还接受从命令行输入的 q 字符,以退出程序。接下来,让我们更详细地研究一下这些组件。

5、Spring Integration 组件

5.1、Message

org.springframework.integration.Message 接口定义了 Spring Message:Spring Integration Context 中的数据传输单元。

public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

它定义了两个关键元素的 get 方法:

  • 消息头,本质上是一个Key/Value容器,可用于传输元数据,由 org.springframework.integration.MessageHeaders 类定义
  • 消息 Payload,即要传输的有价值的实际数据 - 在本例中,视频文件就是 Payload

5.2、Channel

Spring Integration(以及 EAI)中的 Channel 是集成架构中的基本管道。它是一个系统向另一个系统传递消息的管道。

你可以把它想象成一个字面意义上的管道,集成系统或流程可以通过它向其他系统推送信息(或接收来自其他系统的信息)。

Spring Integration 中的 Channel 有很多种,具体取决于你的需求。它们在很大程度上是可配置的,开箱即用,无需任何自定义代码,但如果你有自定义需求,也有一个强大的框架可供使用。

点对点(P2P)Channel 用于在系统或组件之间建立 1 对 1 的通信线路。一个组件在 Channel 上发布信息,另一个组件就能接收到信息。Channel 的每一端只能有一个组件。

如你所见,配置 Channel 就像返回 DirectChannel 实例一样简单:

@Bean
public MessageChannel fileChannel1() {
    return new DirectChannel();
}

@Bean
public MessageChannel fileChannel2() {
    return new DirectChannel();
}

@Bean
public MessageChannel fileChannel3() {
    return new DirectChannel();
}

如上,定义了三个独立的 Channel,它们都由各自的 getter 方法名称标识。

发布-订阅(Pub-Sub)Channel 用于在系统或组件之间建立一对多的通信线路。这样,就可以向之前创建的所有 3 个 DirectChannel 发布信息。

因此,对本例来说,可以用发布-订阅(Pub-Sub)Channel 来取代 P2P Channel:

@Bean
public MessageChannel pubSubFileChannel() {
    return new PublishSubscribeChannel();
}

@Bean
@InboundChannelAdapter(value = "pubSubFileChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

现在,已将入站 Channel 适配器(InboundChannelAdapter)转换为发布到 Pub-Sub Channel;。这样,就可以将从源文件夹读取的文件发送到多个目的地。

5.3、Bridge

Spring Integration 中的 Bridge(桥接器)用于连接两个消息 Channel 或 Adapter,如果它们因某种原因无法直接连接的话。

在本例中,可以使用 Bridge 将发布-订阅 Channel 连接到三个不同的点对点 Channel(因为点对点 Channel 和发布-订阅 Channel 不能直接连接):

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel1() {
    return new DirectChannel();
}

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel2() {
    return new DirectChannel();
}

@Bean
@BridgeFrom(value = "pubSubFileChannel")
public MessageChannel fileChannel3() {
    return new DirectChannel();
}

上述 Bean 配置将 PubSubFileChannel 与三个 P2P(点对点)Channel 桥接起来。@BridgeFrom 注解定义了 Bridge,可应用于需要订阅 Pub-Sub(发布订阅)Channel 的任意数量的 Channel。

可以将上述代码理解为:“创建一个从 pubSubFileChannelfileChannel1fileChannel2fileChannel3 的 Bridge,这样,来自 pubSubFileChannel 的消息就可以同时传送到这三个 Channel”。

5.4、ServiceActivator

Service Activator(服务激活器)是任何在给定方法上定义了 @ServiceActivator 注解的 POJO。这使我们能够在收到入站 Channel 的消息时执行 POJO 上的任何方法,并将消息写入出站 Channel。

在本例中,ServiceActivator 从配置的输入 Channel 接收文件,并将其写入配置的文件夹。

5.5、Adapter

Adapter(适配器)是一种基于企业集成模式的组件,可以 “插入” 系统或数据源。从字面上看,它几乎就像我们从插入墙上插座或电子设备中所熟知的适配器一样。

它允许与数据库、FTP 服务器和消息系统(如 JMS、AMQP 和 Twitter 等社交网络)等 “黑盒” 系统进行可重复使用的连接。连接到这些系统的需求无处不在,这意味着适配器具有很强的可移植性和可重用性(事实上,有一个小型的 适配器目录,任何人都可以免费使用)。

适配器分为 inbound(入站)和 outbound(出站)两大类。

让我们结合示例场景中使用的适配器来看看这些类别:

如你所见,入站适配器(Inbound Adapter)用于从外部系统(这里是文件系统目录)引入信息。

入站适配器配置包括:

  • @InboundChannelAdapter 注解将 Bean 配置标记为适配器 - 配置适配器将向其发送消息的 Channel(在本例中是 MPEG 文件)和 pollerpoller 是一个组件,可帮助适配器以指定的时间间隔轮询配置的文件夹。
  • 一个标准的 Spring Java 配置类,可返回 FileReadingMessageSource,即处理文件系统轮询的 Spring Integration 类实现

出站适配器(Outbound Adapter)用于向外发送消息。Spring Integration 支持多种开箱即用的适配器,适用于各种常见用例。

6、总结

本文通过一个基本的示例来介绍了 Spring Integration,展示了 Spring Integration 库的基于 Java 的配置和可重用性的组件。

Spring Integration 代码既可作为 JavaSE 中的独立项目部署,也可作为 Jakarta EE 环境中更大项目的一部分部署。虽然 Spring Integration 无法与其他以 EAI 为中心的产品和模式(如企业服务总线 ESB)直接竞争,但它是一种可行的轻量级替代方案,可以解决 ESB 所要解决的许多相同问题。


Ref:https://www.baeldung.com/spring-integration