可扩展性和并行处理

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

许多批处理问题都可以通过单线程、单进程 job 来解决,因此在考虑更复杂的实现方法之前,最好先适当检查一下这是否能满足你的需求。首先测量实际 job 的性能,看看最简单的实现是否能满足你的需求。即使使用标准硬件,你也可以在一分钟内读写一个几百兆字节的文件。

当你准备开始执行一项包含某些并行处理的 job 时,Spring Batch 提供了一系列选项,本章将对这些选项进行介绍,尽管某些功能在其他地方也有涉及。从高层次来看,并行处理有两种模式:

  • 单进程、多线程

  • 多进程

它们也分为以下几类:

  • 多线程 Step(单进程)

  • 并行 Step(单进程)

  • 远程分块 Step(多进程)

  • 分区 Step(单进程或多进程)

首先,我们回顾一下单进程选项。然后是多进程选项。

多线程 Step

启动并行处理的最简单方法是在 Step 配置中添加一个 TaskExecutor

例如,你可以为 tasklet 添加一个属性,如下所示:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

使用 Java 配置时,可以在 step 中添加 TaskExecutor,如下例所示:

Java Configuration
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

在此示例中,taskExecutor 是对另一个实现 TaskExecutor 接口的 Bean TaskExecutor 是标准的 Spring 接口,有关可用实现的详细信息,请查阅《Spring 用户指南》。最简单的多线程 TaskExecutorSimpleAsyncTaskExecutor

上述配置的结果是,Step 在一个单独的执行线程中通过读取、处理和写入每个 item 块(每个提交间隔)来执行。需要注意的是,这意味着要处理的项目没有固定顺序,与单线程情况相比,一个数据块可能包含不连续的 item。除了 task executor 设置的限制(如是否由线程池支持)外,tasklet 配置还有一个节流限制(默认值:4)。你可能需要增加该限制,以确保线程池得到充分利用。

例如,你可以增加节流限制(throttle-limit),如下所示:

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

使用 Java 配置时,builder 可提供对节流限制的访问,如下所示:

Java Configuration
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

还需注意的是,step 中使用的任何池化资源(如 DataSource)都可能对并发性设置限制。请确保这些资源池至少与 step 中所需的并发线程数一样大。

在一些常见的批处理用例中,使用多线程 Step 实现有一些实际限制。Step 中的许多参与者(如reader/writer)都是有状态的。如果状态没有按线程隔离,这些组件就无法在多线程 Step 中使用。特别是,Spring Batch 中的大多数 reader/writer 都不是为多线程使用而设计的。不过,我们可以使用无状态或线程安全的 reader/writer, Spring Batch 示例 中有一个示例(名为 parallelJob),展示了如何使用 process indicator(请参阅 防止状态持久化)来跟踪数据库输入表中已处理的 item。

Spring Batch 提供了一些 ItemWriterItemReader 的实现。通常,它们会在 Javadoc 中说明它们是否是线程安全的,或者在并发环境中需要做些什么来避免问题。如果 Javadoc 中没有相关信息,可以检查实现中是否存在任何状态。如果 reader 不是线程安全的,可以使用提供的 SynchronizedItemStreamReader 对其进行装饰,或者在自己的同步 delegator 中使用。你可以同步调用 read(),只要处理和写入是整个分块中最昂贵的部分,你的 step 完成得可能比单线程配置下要快得多。

并行 Step

只要需要并行化的应用程序逻辑可以分割成不同的职责并分配给各个 step,就可以在单个进程中并行化。并行 Step execution 易于配置和使用。

例如,与 step3 并行执行 step (step1step2) 的方法很简单,如下所示:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

使用 Java 配置时,步骤(step1step2)与 step3 并行执行的过程非常简单,如下所示:

Java Configuration
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

可配置的 task executor 用于指定执行各个流程的 TaskExecutor。默认为 SyncTaskExecutor,但需要异步 TaskExecutor 来并行运行各 step。请注意,在汇总退出状态和转换之前,job 会确保拆分中的每个流程都已完成。

更多详情,请参阅 分流 部分。

远程分块

在远程分块中,Step 处理被分割到多个进程中,这些进程通过一些中间件相互通信。下图显示了这种模式:

远程分块
Figure 1. 远程分块

manager 组件是一个进程,而 worker 是多个远程进程。如果 manager 不是瓶颈,那么这种模式的效果最好,因此处理的成本必须高于读取 item 的成本(实际情况往往如此)。

manager 是 Spring Batch Step 的一个实现,其中的 ItemWriter 被一个泛型版本取代,该版本知道如何将 item 块作为消息发送到中间件。Worker 是所使用中间件的标准 listener(例如,对于 JMS,它们是 MesssageListener 实现),其作用是通过 ChunkProcessor 接口,使用标准 ItemWriterItemProcessor 加上 ItemWriter 来处理 item 块。使用这种模式的好处之一是,reader、processor 和 writer 组件都是现成的(与本地执行 step 时使用的组件相同)。item 是动态划分的,工作是通过中间件分担的,因此,如果 listener 都是急切的消费者,负载平衡就会自动实现。

中间件必须是持久的,能够保证交付,并且每条信息只有一个消费者。JMS 显然是最佳选择,但网格计算和共享内存产品领域也有其他选择(如 JavaSpaces)。

分区

Spring Batch 还提供了一个 SPI,用于分割 Step 执行并远程执行。在本例中,远程参与者是 Step 实例,而这些实例本可以很容易地配置并用于本地处理。下图显示了这种模式:

分区 概览
Figure 2. 分区

Job 作为一系列 Step 实例在左侧运行,其中一个 Step 实例被标记为 manager。图中的 Worker 都是一个 Step 的相同实例,它们实际上可以代替 manager,从而为 Job 带来相同的结果。Worker 通常是远程服务,但也可以是本地执行线程。在这种模式下,manager 发送给 Worker 的消息不需要持久性或保证交付。JobRepository 中的 Spring Batch 元数据可确保每个 Worker 只执行一次,而且每次 Job execution 只执行一次。

Spring Batch 中的 SPI 包括一个特殊的 Step 实现(称为 PartitionStep)和两个需要在特定环境中实现的策略接口。这两个策略接口是 PartitionHandlerStepExecutionSplitter,下面的序列图显示了它们的作用:

Partitioning SPI
Figure 3. Partitioning SPI

在本例中,右侧的 Step 是 “remote” worker,因此可能有许多对象或进程在扮演这一角色,而显示的是 PartitionStep 在驱动执行。

下面的示例显示了使用 XML 配置时的 PartitionStep 配置:

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

与多线程 step 的 throttle-limit 属性类似,grid-size 属性可防止 task executor 被来自单个 step 的请求所饱和。

下面的示例显示了使用 Java 配置时的 PartitionStep 配置:

Java Configuration
@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与多线程 step 的 throttleLimit 方法类似,gridSize 方法可防止 task executor 被来自单一 step 的请求挤满。

Spring Batch Samples 的单元测试套件(参见 partition*Job.xml 配置)有一个简单的示例,你可以复制并扩展它。

Spring Batch 为分区创建的 step execution 称为 step1:partition0,以此类推。很多人喜欢将 manager step 称为 step1:manager,以保持一致性。你可以为 step 使用别名(指定 name 属性而不是 id 属性)。

PartitionHandler

PartitionHandler 是了解远程或网格环境结构的组件。它能够向远程 Step 实例发送 StepExecution 请求,并以特定结构格式(如 DTO)封装。它不需要知道如何分割输入数据,也不需要知道如何汇总多个 Step 执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为这些在很多情况下都是结构的特性。无论如何,Spring Batch 始终提供独立于结构的可重启性。失败的 Job 总是可以重新启动,在这种情况下,只有失败的 Step 会被重新执行。

PartitionHandler 接口可以为各种结构类型提供专门的实现,包括简单的 RMI 远程、EJB 远程、自定义 Web 服务、JMS、Java Spaces、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。Spring Batch 不包含任何专有网格或远程结构的实现。

不过,Spring Batch 确实提供了一个有用的 PartitionHandler 实现,它可以使用 Spring 的 TaskExecutor 策略,在独立的执行线程中本地执行 Step 实例。该实现名为 TaskExecutorPartitionHandler

TaskExecutorPartitionHandler 是使用 XML 命名空间配置的 step 的默认设置。你也可以显式配置它,如下所示:

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

你可以使用 Java 配置显式配置 TaskExecutorPartitionHandler,具体如下:

Java Configuration
@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

gridSize 属性决定了要创建的独立 step 执行次数,因此可以与 TaskExecutor 中的线程池大小相匹配。或者,也可以将其设置为大于可用线程数,从而使工作块更小。

TaskExecutorPartitionHandler 对于 IO 密集型 Step 实例非常有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可用于远程执行,方法是提供一个作为远程调用代理的 Step 实现(如使用 Spring Remoting)。

Partitioner

Partitioner 的职责更简单:生成 execution context,作为新 step 执行的输入参数(无需担心重启)。正如下面的接口定义所示,它只有一个方法:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

该方法的返回值会将每个 step 执行的唯一名称(String)与以 ExecutionContext 形式存在的输入参数关联起来。这些名称稍后会在批次元数据中显示为分区 StepExecutions 中的 step 名称。ExecutionContext 只是一袋 name-value 对,因此可能包含一系列主键、行号或输入文件的位置。然后,远程 Step 通常会通过使用 #{…​} 占位符(step scope 中的延迟绑定)与上下文输入绑定,如下节所示。

step execution 的名称(Partitioner 返回的 Map 中的 key)在 Job 的 step execution 中必须是唯一的,但没有其他特殊要求。要做到这一点(并使名称对用户有意义),最简单的方法是使用前缀+后缀的命名约定,其中前缀是正在执行的 step 的名称(该名称本身在 Job 中是唯一的),后缀只是一个计数器。框架中有一个 SimplePartitioner 使用了这种约定。

你可以使用一个名为 PartitionNameProvider 的可选接口,将分区名称与分区本身分开提供。如果 Partitioner 实现了该接口,那么在重启时只需查询名称。如果分区处理的成本较高,这将是一项非常有用的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称一致。

将输入数据绑定到 Step

PartitionHandler 执行的 step 具有相同的配置,并且其输入参数可在运行时从 ExecutionContext 绑定,这样做非常高效。利用 Spring Batch 的 StepScope 功能(在 延迟绑定 一节中将详细介绍)可以轻松做到这一点。例如,如果 Partitioner 创建的 ExecutionContext 实例具有名为 fileName 的 attribute key,并为每个 step 调用指向不同的文件(或目录),那么 Partitioner 的输出可能与下表内容相似:

Table 1. 针对目录处理的 Partitioner 提供的 execution context 的 step execution 名称示例

Step Execution Name (key)

ExecutionContext (value)

filecopy:partition0

fileName=/home/data/one

filecopy:partition1

fileName=/home/data/two

filecopy:partition2

fileName=/home/data/three

然后,文件名可以通过与 execution context 的延迟绑定绑定到 step 中。

下面的示例展示了如何用 XML 定义延迟绑定:

XML Configuration
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>

下面的示例展示了如何在 Java 中定义延迟绑定:

Java Configuration
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}