Spring Batch 整合

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

许多 Spring Batch 用户可能会遇到 Spring Batch 范围之外的需求,但使用 Spring Integration 可以高效简洁地实现这些需求。相反,Spring Integration 用户可能会遇到 Spring Batch 的需求,需要一种方法来有效地整合这两个框架。在这种情况下,会出现几种模式和用例,Spring Batch Integration 可满足这些需求。

Spring Batch 和 Spring Integration 之间的界限并不总是很清晰,但有两条建议可以提供帮助: 考虑粒度和应用常见模式。本节将介绍其中一些常见模式。

将 MQ 添加到批处理流程中可以实现操作自动化,还能将关键问题分开并制定策略。例如,一条消息可能会触发一项作业的执行,然后可以通过多种方式发送消息。另外,当 job 完成或失败时,该事件可能会触发消息的发送,而这些消息的消费者可能会关注与应用程序本身无关的操作问题。消息传递也可以嵌入到 job 中(例如,通过 channel 读取或写入要处理的项目)。远程分区和远程分块提供了将工作负载分配给多个 worker 的方法。

本节包括以下主要概念:

名称空间的支持

Spring Batch Integration 在 1.3 版中添加了专用 XML 命名空间支持,目的是提供更简便的配置体验。要使用命名空间,请在 Spring XML Application Context 文件中添加以下命名空间声明:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    ...

</beans>

下面的示例显示了 Spring Batch Integration 所有配置的 Spring XML application context 文件:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/batch
    https://www.springframework.org/schema/batch/spring-batch.xsd
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd">

    ...

</beans>

在引用的 XSD 文件中附加版本号也是允许的。不过,由于无版本声明始终使用最新 schema,我们一般不建议在 XSD 名称中添加版本号。添加版本号可能会在更新 Spring Batch Integration 依赖项时产生问题,因为这些依赖项可能需要最新版本的 XML schema。

通过 Message 启动批处理作业

使用核心 Spring Batch API 启动批处理作业时,基本上有两种选择:

  • 从命令行使用 CommandLineJobRunner

  • 编程式,通过 JobOperator.start()JobLauncher.run()

例如,在使用 shell 脚本调用批处理作业时,你可能需要使用 CommandLineJobRunner。或者,你也可以直接使用 JobOperator(例如,在将 Spring Batch 作为 Web 应用程序的一部分使用时)。然而,更复杂的用例怎么办?也许你需要轮询远程 (S)FTP 服务器以检索批处理作业(Batch Job)的数据,或者你的应用程序必须同时支持多个不同的数据源。例如,你可能不仅要接收来自 web 的数据文件,还要接收来自 FTP 和其他来源的数据文件。在调用 Spring Batch 之前,可能还需要对输入文件进行额外的转换。

因此,使用 Spring Integration 及其众多 adapter 来执行批处理作业会强大得多。例如,你可以使用 File Inbound Channel Adapter 来监控文件系统中的目录,并在输入文件到达时立即启动批处理作业。此外,你还可以创建使用多个不同 adapter 的 Spring Integration 流,只需配置即可轻松地同时从多个来源为批处理作业摄取数据。使用 Spring Integration 可以轻松实现所有这些方案,因为它允许对 JobLauncher 执行解耦和事件驱动。

Spring Batch Integration 提供了 JobLaunchingMessageHandler 类,可用于启动批处理作业。JobLaunchingMessageHandler 的输入由 Spring Integration message 提供,该 message 的 payload 为 JobLaunchRequest 类型。该类是对要启动的 Job 和启动 Batch job 所需的 JobParameters 的封装。

下图显示了启动批处理作业所需的典型 Spring Integration 消息流。 EIP(企业集成模式)网站 全面介绍了消息传递图标及其说明。

Launch Batch Job
Figure 1. Launch Batch Job

将 File 转换为 JobLaunchRequest

下面的示例将 file 转换为 JobLaunchRequest

package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 响应

当批处理作业正在执行时,会返回一个 JobExecution 实例。你可以使用该实例来确定执行状态。如果能够成功创建 JobExecution,则无论实际执行是否成功,都会返回该它。

如何返回 JobExecution 实例的具体行为取决于所提供的 TaskExecutor。如果使用同步(单线程)TaskExecutor 实现,则只有在 job 完成后才会返回 JobExecution 响应。如果使用异步 TaskExecutor,则会立即返回 JobExecution 实例。然后,你可以获取 JobExecution 实例的 id(使用 JobExecution.getJobId()),并使用 JobExplorer 查询 JobRepository 中 job 的更新状态。有关详细信息,请参阅 查询 Repository

Spring Batch 整合配置

假设有人需要创建一个 inbound-channel-adapter 来监听所提供目录中的 CSV 文件,将其交给转换器(FileMessageToJobRequest),通过作业启动网关(job launching gateway)启动作业,并使用 logging-channel-adapter 记录 JobExecution 的输出。

下面的示例展示了如何在 XML 中配置这种常见情况:

XML Configuration
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

下面的示例展示了如何在 Java 中配置这种常见情况:

Java Configuration
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

ItemReader 配置示例

现在我们正在轮询文件并启动作业,我们需要配置 Spring Batch ItemReader(例如),以便使用在作业参数(jobparameter) "input.file.name" 定义的位置找到的文件,如下所示的 bean 配置:

下面的 XML 示例显示了必要的 Bean 配置:

XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

下面的 Java 示例显示了必要的 Bean 配置:

Java Configuration
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

前面示例中的要点是注入 #{jobParameters['input.file.name']} 的值作为 Resource 属性值,以及将 ItemReader Bean 设置为具有 step scope。将 Bean 设置为具有 step scope 可利用后期绑定支持,从而允许访问 jobParameters 变量。

Job-Launching Gateway 的可用属性

job-launching gateway 具有以下属性,你可以设置这些属性来控制 job:

  • id:标识底层 Spring Bean 定义,它是以下任一种 Spring Bean 的实例:

    • EventDrivenConsumer

    • PollingConsumer(具体实现方式取决于组件的输入 channel 是 SubscribableChannel 还是 PollableChannel)。

  • auto-startup:布尔标志,用于指示端点是否应在启动时自动启动。默认值为 true

  • request-channel:该端点的输入 MessageChannel

  • reply-channelMessageChannel,由此产生的 JobExecution payload 被发送到该消息通道。

  • reply-timeout:让你指定网关在抛出异常之前等待回复信息成功发送到回复通道的时间(以毫秒为单位)。该属性仅适用于通道可能阻塞的情况(例如,使用当前已满的有界队列 channel)。另外,请记住,向 DirectChannel 发送时,调用发生在发送者的线程中。因此,发送操作失败可能是由下游的其他组件造成的。reply-timeout 属性映射到底层 MessagingTemplate 实例的 sendTimeout 属性。如果未指定,该属性默认为 -1,这意味着 Gateway 在默认情况下会无限期等待。

  • job-launcher: 可选。接受自定义 JobLauncher Bean 引用。如果未指定,adapter 将重新使用以 jobLauncher id 注册的实例。如果不存在默认实例,则会出现异常。

  • order: 指定当该端点作为订阅者(subscriber)连接到 SubscribableChannel 时的调用顺序。

子元素

当该 Gateway 接收来自 PollableChannel 的信息时,你必须提供全局默认 Poller 或向 Job Launching Gateway 提供一个 poller 子元素。

下面的示例展示了如何用 XML 提供 poller:

XML Configuration
<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>

下面的示例展示了如何用 Java 提供 poller:

Java Configuration
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}

提供 Messages 反馈

由于 Spring 批处理作业可以运行很长时间,因此提供进度信息往往至关重要。例如,你可能希望在批处理作业的某些或所有部分失败时得到通知。Spring Batch 支持通过以下方式收集这些信息:

  • 主动轮询

  • 事件驱动监听器

异步启动 Spring Batch 作业时(例如,通过使用作业启动网关),会返回一个 JobExecution 实例。因此,你可以使用 JobExecution.getJobId(),通过使用 JobExplorerJobRepository 获取更新的 JobExecution 实例,从而持续轮询状态更新。不过,这种方法并不理想,最好还是采用事件驱动的方法。

因此,Spring Batch 提供了监听器,包括三种最常用的监听器:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在下图所示示例中,Spring Batch 作业已配置了 StepExecutionListener。因此,Spring Integration 会接收并处理事件前后的任何 step。例如,你可以使用 Router 检查接收到的 StepExecution。根据检查结果,会发生各种情况(例如将消息路由到 mail outbound channel adapter),从而根据某些条件发送电子邮件通知。

Handling Informational Messages
Figure 2. Handling Informational Messages

下面由两部分组成的示例展示了如何配置 listener,以便向 Gateway 发送 StepExecution 事件消息,并将其输出记录到 logging-channel-adapter

首先,创建通知 notification integration beans。

下面的示例展示了如何用 XML 创建 notification integration bean。

XML Configuration
<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

下面的示例展示了如何用 Java 创建 notification integration bean:

Java Configuration
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
你需要在配置中添加 @IntegrationComponentScan 注解。

其次,修改 job 以添加 step 级 listener。

下面的示例显示了如何在 XML 中添加 step 级 listener:

XML Configuration
<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

下面的示例展示了如何在 Java 中添加 step 级 listener:

Java Configuration
public Job importPaymentsJob(JobRepository jobRepository) {
    return new JobBuilder("importPayments", jobRepository)
        .start(stepBuilderFactory.get("step1")
                .chunk(200)
                .listener(notificationExecutionsListener())
                ...
              )
}

异步处理器

异步处理器可帮助你扩展项目处理能力。在异步处理器用例中,AsyncItemProcessor 充当调度器,在新线程上执行 ItemProcessor 的项目逻辑。一旦项目完成,Future 就会传递给 AsynchItemWriter 进行写入。

因此,你可以通过使用异步项处理来提高性能,基本上可以实现 fork-join 场景。AsyncItemWriter 会收集结果,并在所有结果可用时立即写回数据块。

下面的示例展示了如何在 XML 中配置 AsyncItemProcessor

XML Configuration
<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

下面的示例展示了如何在 Java 中配置 AsyncItemProcessor

Java Configuration
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}

delegate 属性指的是你的 ItemProcessor Bean,而 taskExecutor 属性指的是你选择的 TaskExecutor

下面的示例展示了如何用 XML 配置 AsyncItemWriter

XML Configuration
<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

下面的示例展示了如何在 Java 中配置 AsyncItemWriter

Java Configuration
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}

同样,delegate 属性实际上是对 ItemWriter Bean 的引用。

批处理流程执行外部化

到目前为止讨论的集成方法都是 Spring Integration 像外壳一样包裹 Spring Batch 的用例。不过,Spring Batch 也可以在内部使用 Spring Integration。通过使用这种方法,Spring Batch 用户可以将 item 甚至块的处理委托给外部进程。这样就可以卸载复杂的处理过程。Spring Batch Integration 为以下方面提供专门支持:

  • 远程分块

  • 远程分区

远程分块

下图显示了在使用 Spring Batch 和 Spring Integration 时远程分块的一种工作方式:

Remote Chunking
Figure 3. Remote Chunking

更进一步,你还可以通过使用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)将分块处理外部化,它可以发送项目并收集结果。一旦发送完毕,Spring Batch 将继续读取并分组项目,而无需等待结果。相反,ChunkMessageChannelItemWriter 负责收集结果并将其整合回 Spring Batch 流程。

通过 Spring Integration,你可以完全控制进程的并发性(例如,使用 QueueChannel 而不是 DirectChannel)。此外,依靠 Spring Integration 丰富的 channel adapter 集合(如 JMS 和 AMQP),你可以将批处理作业的分块分发到外部系统进行处理。

一个具有远程分块 step 的任务可能具有类似于以下 XML 配置:

XML Configuration
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

一个具有远程分块 step 的作业可能具有类似于 Java 中以下的配置:

Java Configuration
public Job chunkJob(JobRepository jobRepository) {
     return new JobBuilder("personJob", jobRepository)
             .start(stepBuilderFactory.get("step1")
                     .<Person, Person>chunk(200)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

ItemReader 引用指向用于读取管理器数据的 Bean。ItemWriter 引用指向一个特殊的 ItemWriter(称为 ChunkMessageChannelItemWriter),如前所述。处理器(如有)不在管理器配置中,因为它是在 Worker 上配置的。在执行用例时,应检查任何其他组件属性,如节流限制等。

以下 XML 配置提供了基本的管理器设置:

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

下面的 Java 配置提供了基本的管理器设置:

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

前面的配置为我们提供了一些 Bean。我们通过使用 ActiveMQ 和 Spring Integration 提供的入站和出站 JMS adapter 来配置消息中间件。如图所示,job step 引用的 itemWriter Bean 使用 ChunkMessageChannelItemWriter 通过配置的中间件写入块。

现在我们可以开始 worker 配置,如下例所示。

下面的示例显示了 XML 格式的 Worker 配置。

XML Configuration
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

下面的示例显示了 Java 中的 Worker 配置。

Java Configuration
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

这些配置项大多与管理器配置相似。Worker 既不需要访问 Spring Batch JobRepository,也不需要访问实际的 job 配置文件。主要的 bean 是 ChunkProcessorChunkHandlerChunkProcessorChunkHandlerchunkProcessor 属性接收一个已配置的 SimpleChunkProcessor,你可以在其中提供对 ItemWriter(以及可选的 ItemProcessor)的引用,当 Worker 收到来自管理器的数据块时,该引用将在 Worker 上运行。

更多信息,请参阅 "可扩展性" 一章中有关 远程分块 的章节。

从 4.1 版开始,Spring Batch Integration 引入了 @EnableBatchIntegration 注解,可用于简化远程分块设置。该注解提供了两个可以在 application context 中自动装配的 Bean:

  • RemoteChunkingManagerStepBuilderFactory:配置 manager step。

  • RemoteChunkingWorkerBuilder: 配置远程 worker 整合流程。

如下图所示,这些 API 负责配置多个组件:

Remote Chunking Configuration
Figure 4. Remote Chunking Configuration

在管理器(manager)方面,RemoteChunkingManagerStepBuilderFactory 可以让你通过声明来配置 manager step:

  • 读取 item 并将其发送给 worker 的 item reader。

  • 向 worker 发送请求的输出 channel("发送请求")。

  • 输入 channel("收到的回复"),用于接收来自 worker 的回复。

你无需显式配置 ChunkMessageChannelItemWriterMessagingTemplate。(如果有必要,你仍可以显式配置它们)。

在 Worker 方面,RemoteChunkingWorkerBuilder 可让你配置 Worker,以便:

  • 监听 manager 在输入 channel 上发送的请求("接收到的请求")。

  • 使用配置的 ItemProcessorItemWriter,为每个请求调用 ChunkProcessorChunkHandlerhandleChunk 方法。

  • 通过输出 channel("发送回复")向 manager 发送回复。

你无需显式配置 SimpleChunkProcessorChunkProcessorChunkHandler。(如果有必要,你仍可以显式配置它们)。

下面的示例展示了如何使用这些 API:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}

你可以在 这里 找到完整的远程分块 job 示例。

远程分区

下图显示了典型的远程分区情况:

Remote Partitioning
Figure 5. Remote Partitioning

另一方面,当瓶颈不是项目的处理,而是相关的 I/O 时,远程分区就非常有用。通过远程分区,你可以将工作发送给执行完整 Spring Batch step 的 Worker。因此,每个 Worker 都有自己的 ItemReaderItemProcessorItemWriter。为此,Spring Batch Integration 提供了 MessageChannelPartitionHandler

PartitionHandler 接口的实现使用 MessageChannel 实例向远程 Worker 发送指令并接收其响应。这为用于与远程 worker 通信的传输(如 JMS 和 AMQP)提供了一个很好的抽象。

“可扩展性”一章中涉及 远程分区 的部分概述了配置远程分区所需的概念和组件,并举例说明了如何使用默认的 TaskExecutorPartitionHandler 在独立的本地执行线程中进行分区。要对多个 JVM 进行远程分区,还需要另外两个组件:

  • 远程 fabric 或 grid 环境。

  • 支持所需远程 fabric 或 grid 环境的 PartitionHandler 实现。

与远程分块类似,你也可以使用 JMS 作为 “remoting fabric”。在这种情况下,如前所述,使用 MessageChannelPartitionHandler 实例作为 PartitionHandler 实现。

下面的示例假定有一个现有的分区 job,重点是 XML 中的 MessageChannelPartitionHandler 和 JMS 配置:

XML Configuration
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

下面的示例假定有一个现有的分区 job,重点是 Java 中的 MessageChannelPartitionHandler 和 JMS 配置:

Java Configuration
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlow.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlow.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

你还必须确保分区 handler 属性映射到 partitionHandler Bean。

下面的示例将分区 handler 属性映射到 XML 中的 partitionHandler

XML Configuration
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

下面的示例将分区 handler 属性映射到 Java 中的 partitionHandler

Java Configuration
	public Job personJob(JobRepository jobRepository) {
		return new JobBuilder("personJob", jobRepository)
				.start(stepBuilderFactory.get("step1.manager")
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}

你可以在 这里 找到一个完整的远程分区任务示例。

你可以使用 @EnableBatchIntegration 注解来简化远程分区设置。该注解提供了两个对远程分区非常有用的 bean:

  • RemotePartitioningManagerStepBuilderFactory:配置 manager step。

  • RemotePartitioningWorkerStepBuilderFactory:配置 worker step。

如下图所示,这些 API 负责配置多个组件:

Remote Partitioning Configuration (with job repository polling)
Figure 6. Remote Partitioning Configuration (with job repository polling)
Remote Partitioning Configuration (with replies aggregation)
Figure 7. Remote Partitioning Configuration (with replies aggregation)

在 manager 方面,RemotePartitioningManagerStepBuilderFactory 可以让你通过声明来配置 manager step:

  • 用于数据分区的 Partitioner

  • 向 worker 发送请求的输出 channel("发送请求")。

  • 输入 channel("收到的回复"),用于接收来自 worker 的回复(配置回复聚合时)。

  • 轮询间隔和超时参数(配置 job repository 轮询时)。

你无需显式配置 MessageChannelPartitionHandlerMessagingTemplate。(如果有必要,你仍然可以显式配置它们)。

在 worker 方面,RemotePartitioningWorkerStepBuilderFactory 可让你配置 worker,以便

  • 监听 manager 在输入 channel 上发送的请求("接收到的请求")。

  • 为每个请求调用 StepExecutionRequestHandlerhandle 方法。

  • 通过输出 channel("发送回复")向 manager 发送回复。

你无需明确配置 StepExecutionRequestHandler。(如果有必要,可以明确配置)。

下面的示例展示了如何使用这些 API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // 中间件 bean 的设置被省略

    }

}