配置 Step

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

正如在 domain 章节 中所讨论的,Step 是一个 domain 对象,它封装了批处理 job 的一个独立的、连续的阶段,并包含定义和控制实际批处理的所有必要信息。这必然是一个模糊的描述,因为任何给定的 Step 的内容都是由编写 Job 的开发者决定的。一个 Step 可以是简单的,也可以是复杂的,正如开发者所希望的那样。一个简单的 Step 可能会将数据从一个文件加载到数据库中,只需要很少或不需要代码(取决于所使用的实现)。一个更复杂的 Step 可能有复杂的业务规则,作为处理的一部分被应用,如下图所示:

Step
Figure 1. Step

面向分块的处理

Spring Batch 在其最常见的实现中使用了 "面向块" 的处理风格。面向块的处理指的是每次读取数据并创建 "块" (chunk),在事务边界内写出。一旦读取的项目数等于提交间隔,整个块就会被 ItemWriter 写出来,然后事务被提交。下面的图片显示了这个过程:

面向分块的处理
Figure 2. 面向分块的处理

下面的伪代码以一种简化的形式展示了同样的概念:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read();
    if (item != null) {
        items.add(item);
    }
}
itemWriter.write(items);

你也可以用一个可选的 ItemProcessor 来配置一个面向分块的 step,以便在将项目传递给 ItemWriter 之前对它们进行处理。下面的图片显示了当一个 ItemProcessor 被注册在 step 中时的过程:

使用 Item Processor 进行面向块的处理
Figure 3. 使用 Item Processor 进行面向块的处理

下面的伪代码显示了如何以一种简化的形式实现这一点:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read();
    if (item != null) {
        items.add(item);
    }
}

List processedItems = new Arraylist();
for(Object item: items){
    Object processedItem = itemProcessor.process(item);
    if (processedItem != null) {
        processedItems.add(processedItem);
    }
}

itemWriter.write(processedItems);

关于 item processor 及其使用情况的更多细节,请参见 Item processing 部分。

配置 Step

尽管 Step 所需的依赖清单相对较短,但它是一个极其复杂的类,有可能包含许多合作者。

为了方便配置,你可以使用Spring Batch XML命名空间,如下例所示:

XML 配置
<job id="sampleJob" job-repository="jobRepository">
    <step id="step1">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

当使用 Java 配置时,你可以使用 Spring Batch builder,如下面的例子所示:

Java 配置
/**
 * Note the JobRepository is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return new JobBuilder("sampleJob", jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * Note the TransactionManager is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

The preceding configuration includes the only required dependencies to create a item-oriented step:

  • reader: The ItemReader that provides items for processing.

  • writer: The ItemWriter that processes the items provided by the ItemReader.

  • transaction-manager: Spring’s PlatformTransactionManager that begins and commits transactions during processing.

  • transactionManager: Spring’s PlatformTransactionManager that begins and commits transactions during processing.

  • job-repository: The XML-specific name of the JobRepository that periodically stores the StepExecution and ExecutionContext during processing (just before committing). For an in-line <step/> (one defined within a <job/>), it is an attribute on the <job/> element. For a standalone <step/>, it is defined as an attribute of the <tasklet/>.

  • repository: The Java-specific name of the JobRepository that periodically stores the StepExecution and ExecutionContext during processing (just before committing).

  • commit-interval: The XML-specific name of the number of items to be processed before the transaction is committed.

  • chunk: The Java-specific name of the dependency that indicates that this is an item-based step and the number of items to be processed before the transaction is committed.

Note that job-repository defaults to jobRepository and transaction-manager defaults to transactionManager. Also, the ItemProcessor is optional, since the item could be directly passed from the reader to the writer.

Note that repository defaults to jobRepository (provided through @EnableBatchProcessing) and transactionManager defaults to transactionManager (provided from the application context). Also, the ItemProcessor is optional, since the item could be directly passed from the reader to the writer.

继承自 Parent Step

如果一组 Step 有类似的配置,那么定义一个 “parent” Step 可能会有帮助,具体的 Step 可以从该 Steps 继承属性。类似于Java中的类继承,“child” Step 将其元素和属性与 parent Step 相结合。child Steps 还可以覆盖 parent Step 的任何内容。

在下面的例子中,StepconcreteStep1,继承自 parentStep。它被实例化为 itemReader, itemProcessor, itemWriter, startLimit=5 以及 allowStartIfComplete=true。此外, commitInterval5,因为它被 concreteStep1 Step 重写了,如下面的例子所示:

<step id="parentStep">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep1" parent="parentStep">
    <tasklet start-limit="5">
        <chunk processor="itemProcessor" commit-interval="5"/>
    </tasklet>
</step>

job 元素中的 step 仍然需要 id 属性。这有两个原因:

  • 在持久化 StepExecution 时,该 id 被用作 step 名称。如果同一个独立的 step 在 job 中被多个 step 引用,就会发生错误。

  • 当创建工作流时,如 本章后面 所述,next 属性应指流程中的 step,而不是独立的 step。

Step 抽象

有时,可能需要定义一个不是完整 Step 配置的 parent Step。例如,如果 readerwritertasklet 属性在 Step 配置中被遗漏了,那么初始化就会失败。如果必须在没有这些属性中的一个或多个的情况下定义 parent,应该使用 abstract 属性。一个 abstractStep 只能被继承,不能被实例化。

在下面的例子中,如果 StepabstractParentStep)没有被声明为 abstract,它就不会被实例化。这个 Step,(concreteStep2)有 itemReader, itemWritercommit-interval=10

<step id="abstractParentStep" abstract="true">
    <tasklet>
        <chunk commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep2" parent="abstractParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter"/>
    </tasklet>
</step>
合并 List

Step 上的一些可配置元素是list,例如 <listeners/> 元素。如果 parent Step 和 child Step 都声明了一个 <listeners/> 元素,那么 child Step 的列表就会覆盖 parent Step 的列表。为了允许child在parent定义的列表中添加额外的监听器,每个 list 元素都有一个 merge 属性。如果该元素指定 merge="true",那么 child 的列表将与 parent 的列表合并,而不是覆盖它。

在下面的例子中,Step "concreteStep3" 被创建,有两个 listener:listenerOnelistenerTwo

<step id="listenersParentStep" abstract="true">
    <listeners>
        <listener ref="listenerOne"/>
    <listeners>
</step>

<step id="concreteStep3" parent="listenersParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
    </tasklet>
    <listeners merge="true">
        <listener ref="listenerTwo"/>
    <listeners>
</step>

提交间隔

如前所述,一个step读入和写出item,通过使用提供的 PlatformTransactionManager 定期提交。在 commit-interval1 的情况下,它在写完每一个item后都会提交。这在很多情况下是不太理想的,因为开始和提交一个事务是很昂贵的。理想情况下,最好是在每个事务中处理尽可能多的item,这完全取决于被处理的数据类型和与之交互的资源。出于这个原因,你可以配置在一次提交中处理的item数量。

下面的例子显示了一个 step,其 taskletcommit-interval 值为10,正如它在XML中定义的那样:

XML 配置
<job id="sampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

下面的例子显示了一个 step,其 taskletcommit-interval 值为10,这是在Java中定义的:

Java Configuration
@Bean
public Job sampleJob(JobRepository jobRepository) {
    return new JobBuilder("sampleJob", jobRepository)
                     .start(step1())
                     .build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

在前面的例子中,每个事务中要处理 10 个 item。在处理的开始,一个事务就开始了。而且,每次在 ItemReader 上调用 read 时,一个计数器会被递增。当它达到10时,汇总的 item 列表被传递给 ItemWriter,并且事务被提交。

配置重启的 Step

在 “配置和运行 Job” 一节中,讨论了重新启动 Job 的问题。重新启动对 step 有许多影响,因此,可能需要一些特定的配置。

设定启动限制

在很多情况下,你可能想控制一个 Step 的启动次数。例如,你可能需要配置一个特定的 Step,使其只运行一次,因为它使一些资源失效,必须在再次运行前手动修复。这在 Step 层面上是可以配置的,因为不同的 Step 可能有不同的要求。一个只能执行一次的 Step 可以与一个可以无限运行的 Step 作为同一 Job 的一部分存在。

下面的代码片段显示了XML中启动限制配置的一个例子:

XML Configuration
<step id="step1">
    <tasklet start-limit="1">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

下面的代码片段显示了Java中启动限制配置的一个例子:

Java Configuration
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.startLimit(1)
				.build();
}

前面例子中的 step 只能运行一次。试图再次运行它将导致抛出一个 StartLimitExceededException。注意,启动限制的默认值是 Integer.MAX_VALUE

重新启动一个已完成的 Step

在可重启 job 的情况下,可能有一个或多个 step 应该总是被运行,不管它们在第一次是否成功。一个例子可能是验证 Step 或在处理前清理资源的 step。在重启 job 的正常处理过程中,任何状态为 COMPLETED(意味着它已经成功完成)的 step 都会被跳过。将 allow-start-if-complete 设置为 true 会覆盖这一点,这样该 step 就会一直运行。

下面的代码片段显示了如何在XML中定义一个可重新启动的 job:

XML Configuration
<step id="step1">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

下面的代码片段显示了如何在Java中定义一个可重新启动的 job:

Java Configuration
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}
Step 重启配置示例

下面的XML例子显示了如何配置一个 job,使其具有可重新启动的 step:

XML 配置
<job id="footballJob" restartable="true">
    <step id="playerload" next="gameLoad">
        <tasklet>
            <chunk reader="playerFileItemReader" writer="playerWriter"
                   commit-interval="10" />
        </tasklet>
    </step>
    <step id="gameLoad" next="playerSummarization">
        <tasklet allow-start-if-complete="true">
            <chunk reader="gameFileItemReader" writer="gameWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
    <step id="playerSummarization">
        <tasklet start-limit="2">
            <chunk reader="playerSummarizationSource" writer="summaryWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
</job>

下面的Java例子显示了如何配置一个 job,使其具有可重新启动的 step:

Java 配置
@Bean
public Job footballJob(JobRepository jobRepository) {
	return new JobBuilder("footballJob", jobRepository)
				.start(playerLoad())
				.next(gameLoad())
				.next(playerSummarization())
				.build();
}

@Bean
public Step playerLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerLoad", jobRepository)
			.<String, String>chunk(10, transactionManager)
			.reader(playerFileItemReader())
			.writer(playerWriter())
			.build();
}

@Bean
public Step gameLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("gameLoad", jobRepository)
			.allowStartIfComplete(true)
			.<String, String>chunk(10, transactionManager)
			.reader(gameFileItemReader())
			.writer(gameWriter())
			.build();
}

@Bean
public Step playerSummarization(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerSummarization", jobRepository)
			.startLimit(2)
			.<String, String>chunk(10, transactionManager)
			.reader(playerSummarizationSource())
			.writer(summaryWriter())
			.build();
}

前面的配置示例是为一个加载足球比赛信息并总结的工作。它包含三个 step:playerLoad, gameLoadplayerSummarizationplayerLoad step 从一个平面文件中加载球员信息,而 gameLoad step 对比赛也做同样的工作。最后一个 step,playerSummarization,然后根据提供的游戏总结每个玩家的统计数据。假设由 playerLoad 加载的文件必须只加载一次,但 gameLoad 可以加载在特定目录中发现的任何游戏,在它们被成功加载到数据库后删除它们。因此,playerLoad step 不包含额外的配置。它可以被启动任何次数,如果完成则跳过。然而,gameLoad step 需要每次都运行,以防在它上次运行后有额外的文件被添加。它的允 allow-start-if-complete 设置为 true,以便总是被启动。(假设游戏加载的数据库表上有一个进程指示器,以确保新游戏能被总结 step 正确找到)。summarization step 是工作中最重要的,被配置为启动限制为2。这很有用,因为如果该 step 持续失败,就会向控制 job execution 的操作人员返回一个新的退出代码,在人工干预之前,它不能再次启动。

这个 job 为本文档提供了一个例子,与示例项目中的 footballJob 不同。

本节的其余部分描述了 footballJob 例子的三次运行中每一次发生的情况。

运行 1:

  1. playerLoad 运行并成功完成,向 PLAYERS 表添加了400个玩家。

  2. gameLoad 运行并处理价值11个文件的游戏数据,将其内容加载到 GAMES 表中。

  3. playerSummarization 开始处理,5分钟后失败。

运行 2:

  1. playerLoad 不运行,因为它已经成功完成,并且 allow-start-if-completefalse(默认)。

  2. gameLoad 再次运行并处理另外两个文件,将它们的内容也加载到 GAMES 表中(有一个 process 指示器表明它们还没有被处理)。

  3. playerSummarization 开始处理所有剩余的游戏数据(使用 process 指标进行过滤),30分钟后再次失败。

运行 3:

  1. playerLoad 不运行,因为它已经成功完成,并且 allow-start-if-completefalse(默认)。

  2. gameLoad 再次运行并处理另外两个文件,将它们的内容也加载到 GAMES 表中(有一个 process 指示器表明它们还没有被处理)。

  3. playerSummarization 没有被启动,job 被立即杀死,因为这是 playerSummarization 的第三次执行,而它的限制只有 2,要么必须提高限制,要么 Job 必须作为一个新的 JobInstance 被执行。

配置跳过(Skip )逻辑

在很多情况下,处理过程中遇到的错误不应该导致 Step 失败,而是应该跳过。这通常是一个必须由了解数据本身和它的意义的人做出的决定。例如,财务数据可能不能跳过,因为它导致了金钱的转移,这需要完全准确。另一方面,加载一个供应商的列表,可能允许跳过。如果一个供应商因为格式不正确或缺少必要的信息而没有被加载,可能就没有问题了。通常,这些不好的记录也会被记录下来,这在后面讨论 listener 的时候会涉及。

下面的XML例子显示了一个使用跳过限制的例子:

XML 配置
<step id="step1">
   <tasklet>
      <chunk reader="flatFileItemReader" writer="itemWriter"
             commit-interval="10" skip-limit="10">
         <skippable-exception-classes>
            <include class="org.springframework.batch.item.file.FlatFileParseException"/>
         </skippable-exception-classes>
      </chunk>
   </tasklet>
</step>

下面的Java例子显示了一个使用跳过限制的例子:

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(FlatFileParseException.class)
				.build();
}

在前面的例子中,使用了一个 FlatFileItemReader。如果在任何时候抛出一个 FlatFileParseException,这个 item 就会被跳过,并被计入总的跳过限制(10)。被声明的异常(和它们的子类)可能会在块处理的任何阶段(读取、处理或写入)被抛出。在步骤执行过程中,对读、处理和写的跳过进行单独统计,但该限制适用于所有跳过。一旦达到跳过限制,发现的下一个异常将导致该步骤失败。换句话说,第11次跳过会触发异常,而不是第10次。

前面的例子的一个问题是,除了 FlatFileParseException 之外的任何其他异常都会导致 Job 失败。在某些情况下,这可能是正确的行为。然而,在其他情况下,识别哪些异常应该导致失败,并跳过其他一切,可能会更容易。

下面的XML例子显示了一个排除特定异常的例子:

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="flatFileItemReader" writer="itemWriter"
               commit-interval="10" skip-limit="10">
            <skippable-exception-classes>
                <include class="java.lang.Exception"/>
                <exclude class="java.io.FileNotFoundException"/>
            </skippable-exception-classes>
        </chunk>
    </tasklet>
</step>

下面的Java例子显示了一个排除特定异常的例子:

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(Exception.class)
				.noSkip(FileNotFoundException.class)
				.build();
}

通过将 java.lang.Exception 确定为可跳过的异常类,该配置表明所有的 Exception 都是可跳过的。然而,通过 "排除" java.io.FileNotFoundException,配置将可跳过的异常类列表细化为除了 FileNotFoundException 之外的所有 Exception。任何被排除的异常类在遇到时都是致命的(也就是说,它们不会被跳过)。

对于遇到的任何异常,其可跳过性是由类层次结构中最近的超类决定的。任何未分类的异常都被当作 'fatal' 处理。

The order of the <include/> and <exclude/> elements does not matter.

The order of the skip and noSkip method calls does not matter.

配置重试逻辑

在大多数情况下,你希望一个异常要么导致跳过,要么导致一个 Step 失败。然而,并不是所有的异常都是确定性的。如果在读取过程中遇到了 FlatFileParseException,它总是会被抛出用于该记录。重置 ItemReader 并没有帮助。然而,对于其他的异常(比如 DeadlockLoserDataAccessException,它表明当前进程试图更新另一个进程拥有锁的记录),等待并再次尝试可能会导致成功。

在XML中,重试应按以下方式进行配置:

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter"
             commit-interval="2" retry-limit="3">
         <retryable-exception-classes>
            <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
         </retryable-exception-classes>
      </chunk>
   </tasklet>
</step>

在Java中,重试应按以下方式进行配置:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.retryLimit(3)
				.retry(DeadlockLoserDataAccessException.class)
				.build();
}

Step 允许对单个 item 的重试次数进行限制,以及列出 "可重试" 的异常情况。你可以在 retry 中找到更多关于重试工作的细节。

Controlling Rollback

默认情况下,无论重试还是跳过,从 ItemWriter 抛出的任何异常都会导致由该 Step 控制的事务回滚。如果跳过配置如前所述,从 ItemReader 抛出的异常不会导致回滚。然而,在许多情况下,从 ItemWriter 抛出的异常不应该导致回滚,因为没有发生任何行动来使事务无效。出于这个原因,你可以在 Step 中配置一个不应该导致回滚的异常列表。

在XML中,你可以按以下方式控制回滚:

XML 配置
<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
      <no-rollback-exception-classes>
         <include class="org.springframework.batch.item.validator.ValidationException"/>
      </no-rollback-exception-classes>
   </tasklet>
</step>

在Java中,你可以按以下方式控制回滚:

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.noRollback(ValidationException.class)
				.build();
}
事务 Reader

ItemReader 的基本契约是,它是只向前的。该 step 缓冲了 reader 的输入,因此,在回滚的情况下,item 不需要从 reader 中重新读取。然而,在某些情况下,reader 是建立在一个事务性资源之上的,比如JMS队列。在这种情况下,由于队列与被回滚的事务相联系,已经从队列中拉出的消息会被放回。出于这个原因,你可以配置该 step,使其不缓冲这些 item。

下面的例子显示了如何在xml中创建一个不缓冲 item 的 reader:

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"
               is-reader-transactional-queue="true"/>
    </tasklet>
</step>

下面的例子显示了如何在Java中创建一个不缓冲 item 的 reader:

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.readerIsTransactionalQueue()
				.build();
}

事务属性

你可以使用事务属性来控制 isolationpropagationtimeout 设置。你可以在 Spring核心文档 中找到更多关于设置事务属性的信息。

下面的例子在XML中设置 isolationpropagationtimeout 事务属性:

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        <transaction-attributes isolation="DEFAULT"
                                propagation="REQUIRED"
                                timeout="30"/>
    </tasklet>
</step>

下面的例子在Java中设置 isolationpropagationtimeout 事务属性:

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
	attribute.setPropagationBehavior(Propagation.REQUIRED.value());
	attribute.setIsolationLevel(Isolation.DEFAULT.value());
	attribute.setTimeout(30);

	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.transactionAttribute(attribute)
				.build();
}

Step 注册 ItemStream

该 step 必须在其生命周期的必要点上处理 ItemStream 的回调。(关于 ItemStream 接口的更多信息,见 ItemStream)。如果一个 step 失败了,可能需要重新启动,这是至关重要的,因为 ItemStream 接口是 step 获得它需要的关于 execution 之间的持久状态的信息的地方。

如果 ItemReaderItemProcessorItemWriter 本身实现了 ItemStream 接口,这些将被自动注册。任何其他的流都需要单独注册。这通常是间接依赖的情况,如委托(delegates),被注入到 reader 和 writer 中。你可以通过 stream 元素在 step 中注册一个流。

下面的例子显示了如何在XML中的 step 上注册一个 stream

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
            <streams>
                <stream ref="fileItemWriter1"/>
                <stream ref="fileItemWriter2"/>
            </streams>
        </chunk>
    </tasklet>
</step>

<beans:bean id="compositeWriter"
            class="org.springframework.batch.item.support.CompositeItemWriter">
    <beans:property name="delegates">
        <beans:list>
            <beans:ref bean="fileItemWriter1" />
            <beans:ref bean="fileItemWriter2" />
        </beans:list>
    </beans:property>
</beans:bean>

下面的例子显示了如何在Java中的 step 上注册一个 stream

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(compositeItemWriter())
				.stream(fileItemWriter1())
				.stream(fileItemWriter2())
				.build();
}

/**
 * In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
 * necessary, but used for an example.
 */
@Bean
public CompositeItemWriter compositeItemWriter() {
	List<ItemWriter> writers = new ArrayList<>(2);
	writers.add(fileItemWriter1());
	writers.add(fileItemWriter2());

	CompositeItemWriter itemWriter = new CompositeItemWriter();

	itemWriter.setDelegates(writers);

	return itemWriter;
}

在前面的例子中,CompositeItemWriter 不是一个 ItemStream,但它的两个委托(delegate)都是。因此,这两个委托 writer 必须明确地注册为 stream,以便框架正确处理它们。ItemReader 不需要明确地注册为 stream,因为它是 Step 的直接属性。该 step 现在是可重启的,并且在失败的情况下,reader 和 writer 的状态被正确地持久化。

拦截 Step 的执行

就像 Job 一样,在 Step 的执行过程中,有许多事件,用户可能需要执行一些功能。例如,为了写出一个需要页脚的平面文件,ItemWriter 需要在 Step 完成后得到通知,以便可以写出页脚。这可以通过许多 Step scope 内的监听器(listeners)来实现。

你可以通过 listeners 元素将任何实现 StepListener 的一个扩展的类(但不是那个接口本身,因为它是空的)应用到一个step。listeners 元素在 step、tasklet 或 chunk 的声明中是有效的。我们建议你在其功能适用的级别上声明 listeners,如果它是多功能的(如 StepExecutionListenerItemReadListener),则在其适用的最细的级别上声明。

下面的例子显示了在 XML 的 chunk level 上应用的 listener:

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

下面的例子显示了在 Java 的 chunk level 上应用的 listener:

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

如果使用命名空间 <step> 元素或 *StepFactoryBean 工厂之一,本身实现了 StepListener 接口之一的 ItemReaderItemWriterItemProcessor 会自动注册到 Step 中。这只适用于直接注入到 Step 中的组件。如果 listener 被嵌套在另一个组件中,你需要显式地注册它(如之前在 Step 注册 ItemStream 下所描述的)。

除了 StepListener 接口之外,还提供了注解来解决同样的问题。普通的Java对象可以有带有这些注解的方法,然后被转换为相应的 StepListener 类型。对大块组件的自定义实现进行注解也很常见,如 ItemReaderItemWriterTasklet。这些注解被XML解析器分析为 <listener/> 元素,以及与 builder 中的 listener 方法一起注册,所以你所需要做的就是使用 XML 命名空间或 builder 将 listener 与 step 一起注册。

StepExecutionListener

StepExecutionListener 代表了 Step 执行的最通用的 listener。它允许在一个 Step 开始之前和结束之后进行通知,无论它是正常结束还是失败,正如下面的例子所示:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus 有一个 afterStep 的返回类型,以使 listener 有机会修改在完成一个 Step 后返回的退出代码。

与此接口相对应的注释是:

  • @BeforeStep

  • @AfterStep

ChunkListener

一个 “chunk” 被定义为在一个事务范围内处理的 item。在每个提交间隔中,提交一个事务,就提交了一个chunk。你可以使用 ChunkListener 在一个 chunk 开始处理之前或在一个 chunk 成功完成之后执行逻辑,如下面的接口定义所示:

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk 方法是在事务开始后,但在 ItemReader 上开始读取之前被调用。反之,afterChunk 方法是在事务提交后被调用的(如果有回滚,则根本不调用)。

与该接口相对应的注解是:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

你可以在没有 chunk 声明时应用 ChunkListenerTaskletStep 负责调用 ChunkListener,所以它也适用于非面向 item 的 tasklet(它在 tasklet 之前和之后被调用)。

ItemReadListener

在之前讨论跳过逻辑的时候,有人提到记录跳过的记录可能是有益的,这样可以在以后处理。在读取错误的情况下,这可以通过 ItemReaderListener 来完成,正如下面的接口定义所示:

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法在每次调用 ItemReader 进行读取之前被调用。afterRead 方法在每次成功调用读取后被调用,并传递给被读取的 item。如果在读取时出现了错误,就会调用 onReadError 方法。遇到的异常会被提供,这样就可以被记录下来。

与该接口相对应的注解是:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 一样,一个 item 的处理可以被 "监听",正如下面的接口定义所示:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法在 ItemProcessorprocess 之前被调用,并被交给要被处理的 item。 afterProcess 方法在 item 被成功处理后被调用。如果在处理过程中出现了错误,就会调用 onProcessError 方法。遇到的异常和试图被处理的 item 将被提供,这样就可以被记录下来。

与该接口相对应的注解是:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

你可以用 ItemWriteListener 来 "监听" 一个 item 的写入,正如下面的接口定义所示:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在 ItemWriterwrite 之前被调用,并被交给要写入的 item 列表。afterWrite 方法在 item 被成功写入后被调用。如果在写的时候出现了错误,onWriteError 方法会被调用。遇到的异常和试图被写入的 item 将被提供,这样它们就可以被记录下来。

与该接口相对应的注解是:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了被通知错误的机制,但没有一个能通知你一个记录确实被跳过了。 例如,onWriteError,即使一个 item 被重试并成功,也会被调用。由于这个原因,有一个单独的接口来跟踪被跳过的 item,正如下面的接口定义所示:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead 当一个 item 在读取时被跳过时被调用。应该注意的是,回滚可能会导致同一个 item 被多次注册为跳过。onSkipInWrite 是在一个 item 被写入时跳过时调用的。因为该 item 已经被成功读取(并且没有被跳过),它也被提供 item 本身作为参数。

与该接口相对应的注解是:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners 和 Transactions

SkipListener 最常见的用例之一是注销一个跳过的 item,这样另一个批处理甚至是人工处理就可以用来评估和修复导致跳过的问题。因为有很多情况下,原始事务可能会被回滚,所以 Spring Batch 做了两个保证:

  • 适当的跳过方法(取决于错误发生的时间)对每个 item 只调用一次。

  • SkipListener 总是在事务被提交之前被调用。这是为了确保监听器调用的任何事务性资源不会因为 ItemWriter 的失败而回滚。

TaskletStep

面向块的处理 不是在 Step 中处理的唯一方式。如果一个 Step 必须包括一个存储过程的调用呢?你可以将调用实现为一个 ItemReader,并在过程结束后返回 null。然而,这样做有点不自然,因为需要有一个无操作的 ItemWriter。Spring Batch 为这种情况提供了 TaskletStep

Tasklet 接口有一个方法,execute,它被 TaskletStep 反复调用,直到它返回 RepeatStatus.FINISHED 或抛出一个异常以示失败。对 Tasklet 的每个调用都被封装在一个事务中。Tasklet 实现者可能调用一个存储过程、一个脚本或一个SQL更新语句。

To create a TaskletStep in XML, the ref attribute of the <tasklet/> element should reference a bean that defines a Tasklet object. No <chunk/> element should be used within the <tasklet/>. The following example shows a simple tasklet:

<step id="step1">
    <tasklet ref="myTasklet"/>
</step>

To create a TaskletStep in Java, the bean passed to the tasklet method of the builder should implement the Tasklet interface. No call to chunk should be called when building a TaskletStep. The following example shows a simple tasklet:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
    			.tasklet(myTasklet(), transactionManager)
    			.build();
}
如果它实现了 StepListener 接口,TaskletStep 会自动将该 tasklet 注册为一个 StepListener

TaskletAdapter

ItemReaderItemWriter 接口的其他适配器一样,Tasklet 接口包含一个实现,允许将自己适应于任何预先存在的类: TaskletAdapter。一个可能有用的例子是一个现有的 DAO,它被用来更新一组记录上的一个标志。你可以使用 TaskletAdapter 来调用这个类,而不需要为 Tasklet 接口写一个适配器。

下面的例子显示了如何在 XML 中定义一个 TaskletAdapter

XML 配置
<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
    <property name="targetObject">
        <bean class="org.mycompany.FooDao"/>
    </property>
    <property name="targetMethod" value="updateFoo" />
</bean>

下面的例子显示了如何在Java中定义一个 TaskletAdapter

Java 配置
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
	MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

	adapter.setTargetObject(fooDao());
	adapter.setTargetMethod("updateFoo");

	return adapter;
}

Tasklet 实现示例

许多批处理作业包含在主要处理开始前必须完成的step,以设置各种资源或在处理完成后清理这些资源。在大量处理文件的作业中,往往需要在某些文件被成功上传到另一个地方后在本地删除这些文件。下面的例子(取自 Spring Batch samples project)是一个具有这样职责的 Tasklet 实现:

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.state(directory != null, "directory must be set");
    }
}

前面的 tasklet 实现删除了一个给定目录中的所有文件。应该注意的是, execute 方法只被调用一次。剩下的就是引用 step 中的 tasklet

下面的例子显示了如何在 XML 中引用 step 中的 tasklet

XML 配置
<job id="taskletJob">
    <step id="deleteFilesInDir">
       <tasklet ref="fileDeletingTasklet"/>
    </step>
</job>

<beans:bean id="fileDeletingTasklet"
            class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
    <beans:property name="directoryResource">
        <beans:bean id="directory"
                    class="org.springframework.core.io.FileSystemResource">
            <beans:constructor-arg value="target/test-outputs/test-dir" />
        </beans:bean>
    </beans:property>
</beans:bean>

下面的例子显示了如何在 Java 中引用 step 中的 tasklet

Java 配置
@Bean
public Job taskletJob(JobRepository jobRepository) {
	return new JobBuilder("taskletJob", jobRepository)
				.start(deleteFilesInDir())
				.build();
}

@Bean
public Step deleteFilesInDir(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("deleteFilesInDir", jobRepository)
				.tasklet(fileDeletingTasklet(), transactionManager)
				.build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
	FileDeletingTasklet tasklet = new FileDeletingTasklet();

	tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

	return tasklet;
}

控制 Step 流

随着在一个自有 job 内将 step 分组的能力,需要能够控制作业如何从一个 Step "流向" 另一个 Step。一个 Step 的失败并不一定意味着 Job 应该失败。此外,可能有不止一种类型的 "成功" 来决定下一步应该执行哪个 Step。根据一组 Step 的配置方式,某些 Step 甚至可能根本就不被处理。

顺序流

最简单的流程场景是一个所有 step 都按顺序执行的工作,如下图所示:

顺序流
Figure 4. 顺序流

这可以通过在一个 step 中使用 next 来实现。

下面的例子显示了如何在 XML 中使用 next 属性:

XML 配置
<job id="job">
    <step id="stepA" parent="s1" next="stepB" />
    <step id="stepB" parent="s2" next="stepC"/>
    <step id="stepC" parent="s3" />
</job>

下面的例子显示了如何在 Java 中使用 next() 方法:

Java 配置
@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
				.start(stepA())
				.next(stepB())
				.next(stepC())
				.build();
}

在上述情况下,stepA 首先运行,因为它是列出的第一个 Step。如果 stepA 正常完成,stepB 运行,以此类推。但是,如果 step A 失败,整个 Job 就会失败,stepB 就不会执行。

对于 Spring Batch XML 命名空间,配置中列出的第一个 step 始终是 Job 运行的第一个 step。其他 step 元素的顺序并不重要,但第一个 step 必须总是出现在 XML 的首位。

条件流

在前面的例子中,只有两种可能性:

  1. step 成功,应执行下一 step

  2. step 失败,因此 job 也应失败。

在许多情况下,这就足够了。但是,如果一个 step 失败后,应该触发另一个 step,而不是导致失败,这种情况又该如何处理呢?下图显示了这种流程:

Conditional Flow
Figure 5. Conditional Flow

为了处理更复杂的情况,Spring Batch XML 命名空间允许你在 step 元素中定义过渡元素。next 元素就是这样一个过渡元素。与 next 属性一样,next 元素告诉 Job 下一步要执行哪个 Step。然而,与属性不同的是,在给定 Step 上允许使用任意数量的 next 元素,并且在失败情况下没有默认行为。这意味着,如果使用过渡元素,则必须明确定义 Step 过渡的所有行为。还请注意,一个 Step 不能同时具有 next 属性和过渡元素。

next 元素指定了要匹配的模式和下一步要执行的 step,如下例所示:

XML Configuration
<job id="job">
    <step id="stepA" parent="s1">
        <next on="*" to="stepB" />
        <next on="FAILED" to="stepC" />
    </step>
    <step id="stepB" parent="s2" next="stepC" />
    <step id="stepC" parent="s3" />
</job>

Java API 提供了一组 fluent 方法,可让你指定流程以及当某一步失败时该如何处理。下面的示例展示了如何指定一个 step(stepA ),然后根据 stepA 是否成功进入两个不同步骤(stepBstepC)中的任一 step:

Java Configuration
@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
				.start(stepA())
				.on("*").to(stepB())
				.from(stepA()).on("FAILED").to(stepC())
				.end()
				.build();
}

当使用 XML 配置时,过渡元素的 on 属性使用简单的模式匹配方案来匹配步骤执行后的 ExitStatus

当使用 java 配置时,on() 方法使用简单的模式匹配方案来匹配 Step 执行后的 ExitStatus

模式中只允许使用两个特殊字符:

  • * 匹配0个或多个字符。

  • ? 完全匹配一个字符。

例如,c*t 匹配 catcount,而 c?t 匹配 cat 但不匹配 count

虽然对一个 Step 上的过渡元素的数量没有限制,但如果 Step 执行的结果是一个元素没有覆盖的 ExitStatus,框架就会抛出一个异常,Job 就会失败。框架会自动将过渡元素从最特殊到最不特殊排序。这意味着,即使在前面的示例中将 stepA 的排序对调,FAILEDExitStatus 仍然会进入 stepC

批处理状态与退出状态

在为条件流配置 Job 时,了解 BatchStatusExitStatus 之间的区别非常重要。BatchStatus 是一个枚举,是 JobExecutionStepExecution 的属性,框架使用它来记录 JobStep 的状态。它可以是以下值之一: Completed(完成)、STARTING(开始)、STARTED(已开始)、STOPPING(停止)、STOPPED(已停止)、FAILED(失败)、ABANDONED(放弃)或 UNKNOWN(未知)。其中大部分不言自明: COMPLETED(完成)是 step 或 job 成功完成时设置的状态,FAILED(失败)是失败时设置的状态,以此类推。

下面的示例包含了使用XML配置时的 next 元素:

<next on="FAILED" to="stepB" />

下面的示例包含了使用Java配置时的 on 元素:

...
.from(stepA()).on("FAILED").to(stepB())
...

乍看起来,on 引用了所属 StepBatchStatus。然而,实际上它引用的是 StepExitStatus。顾名思义,ExitStatus 表示 Step 执行结束后的状态。

更具体地说,当使用XML配置时,前面的XML配置示例中显示的 next 元素引用 ExitStatus 的退出代码。

当使用 Java 配置时,前面 Java 配置示例中的 on() 方法引用 ExitStatus 的退出代码。

用中文说就是:“如果退出代码为FAILED,则转到步骤B”。默认情况下,退出代码总是与该 StepBatchStatus 相同,这就是前面的条目能够工作的原因。但是,如果退出代码需要不同呢?示例项目中的跳过示例 job 就是一个很好的例子:

下面的示例展示了如何在 XML 中使用不同的退出代码:

XML Configuration
<step id="step1" parent="s1">
    <end on="FAILED" />
    <next on="COMPLETED WITH SKIPS" to="errorPrint1" />
    <next on="*" to="step2" />
</step>

下面的示例展示了如何在 Java 中使用不同的退出代码:

Java Configuration
@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
			.start(step1()).on("FAILED").end()
			.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
			.from(step1()).on("*").to(step2())
			.end()
			.build();
}

step1 有三种可能:

  • Step 失败,在这种情况下,job 应该失败。

  • Step 成功完成。

  • Step 成功完成,但退出代码为 COMPLETED WITH SKIPS。在这种情况下,应该运行另一个 step 来处理错误。

上述配置有效。但是,正如下面的示例所示,需要根据跳过记录的执行条件更改退出代码:

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

前面的代码是一个 StepExecutionListener,它首先检查 Step 是否成功,然后检查 StepExecution 的跳过计数是否大于 0。 如果两个条件都满足,将返回一个新的 ExitStatus,其退出代码为 COMPLETED WITH SKIPS

配置停止

在讨论了 BatchStatusExitStatus 之后,人们可能想知道如何确定 JobBatchStatusExitStatusStep 的这些状态由执行的代码决定,而 Job 的状态则根据配置决定。

到目前为止,讨论的所有 job 配置都至少有一个没有过渡的最后 Step

在下面的 XML 示例中,step 执行后,Job 结束:

<step id="stepC" parent="s3"/>

在下面的 Java 示例中,step 执行后,Job 结束:

@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
				.start(step1())
				.build();
}

如果没有为 Step 定义过渡,则 Job 的状态定义如下:

  • 如果 Step 结束时的 ExitStatusFAILED,则 JobBatchStatusExitStatus 均为 FAILED

  • 否则,JobBatchStatus(批处理状态)和 ExitStatus(退出状态)都是 COMPLETED(已完成)。

虽然这种终止批处理 job 的方法对于某些批处理 Job(如简单的顺序 step job)来说已经足够,但可能还需要自定义的 job 停止场景。为此,Spring Batch 提供了三个过渡元素来停止 Job(除了我们之前讨论过的 next 元素)。每个停止元素都以特定的 BatchStatus 停止 Job。需要注意的是,停止过渡元素对 Job 中任何 StepBatchStatusExitStatus 都没有影响。这些元素只影响 Job 的最终状态。例如,Job 中的每个 Step 的状态都可能是 FAILED(失败),但 job 的状态却是 COMPLETED(完成)。

Step 结束

配置 step 结束指示 JobBatchStatusCOMPLETED 时停止。完成状态为 COMPLETEDJob 无法重新启动(框架会抛出 JobInstanceAlreadyCompleteException)。

当使用 XML 配置时,你可以使用 end 元素来完成此任务。end 元素还允许一个可选的 exit-code 属性,你可以使用该属性自定义 JobExitStatus。如果没有给出 exit-code 属性,则 ExitStatus 默认为 COMPLETED,以与 BatchStatus 匹配。

使用 Java 配置时,该任务使用 end 方法。end 方法还允许一个可选的 exitStatus 参数,你可以使用该参数自定义 JobExitStatus。如果没有提供 exitStatus 值,则 ExitStatus 默认为 COMPLETED,以匹配 BatchStatus

考虑以下情况:如果 step2 失败,则 Job 停止,BatchStatusCOMPLETEDExitStatusCOMPLETEDstep3 不运行。否则,执行到 step3。请注意,如果 step2 失败,该 Job 不可重新启动(因为状态是 COMPLETED)。

下面的示例显示了 XML 中的场景:

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <end on="FAILED"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

下面的示例展示了Java中的场景:

@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
				.start(step1())
				.next(step2())
				.on("FAILED").end()
				.from(step2()).on("*").to(step3())
				.end()
				.build();
}
失败 Step

将 step 配置为在给定点失败,可指示 JobFAILEDBatchStatus 停止。与结束不同的是,Job 的失败不会阻止 Job 的重新启动。

使用 XML 配置时,fail 元素还允许可选的 exit-code 属性,该属性可用于自定义 JobExitStatus。如果没有给出 exit-code 属性,则 ExitStatus 默认为 FAILED,以匹配 BatchStatus

考虑以下情况:如果 step2 失败,则 Job 停止,BatchStatusFAILEDExitStatusEARLY TERMINATIONstep3 不执行。否则,将执行 step3。此外,如果 step2 失败并且重新启动了 Job,执行将从 step2 重新开始。

下面的示例显示了 XML 中的场景:

XML Configuration
<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <fail on="FAILED" exit-code="EARLY TERMINATION"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

下面的示例显示了Java中的场景:

Java Configuration
@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
			.start(step1())
			.next(step2()).on("FAILED").fail()
			.from(step2()).on("*").to(step3())
			.end()
			.build();
}
在指定 Step 停止 Job

将 job 配置为在特定 step 停止,可指示 JobSTOPPED(停止)的 BatchStatus 停止。停止 Job 可以暂时中断处理,以便运维在重新启动 Job 前采取一些措施。

当使用 XML 配置时,stop 元素需要一个 restart 属性,该属性指定当重新启动 Job 时应重新开始执行的 step。

当使用 Java 配置时,stopAndRestart 方法需要一个 restart 属性,该属性指定了重新启动 Job 时应重新开始执行的 step。

请考虑以下情况:如果 step1COMPLETE 结束,则 job 停止。重新启动后,开始执行 step2

以下列表显示了 XML 中的场景:

<step id="step1" parent="s1">
    <stop on="COMPLETED" restart="step2"/>
</step>

<step id="step2" parent="s2"/>

下面的示例显示了Java中的场景:

@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
			.start(step1()).on("COMPLETED").stopAndRestart(step2())
			.end()
			.build();
}

编程式流程决策

在某些情况下,可能需要比 ExitStatus 更多的信息来决定下一步执行哪个 step。在这种情况下,可以使用 JobExecutionDecider 来帮助做出决定,如下例所示:

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String status;
        if (someCondition()) {
            status = "FAILED";
        }
        else {
            status = "COMPLETED";
        }
        return new FlowExecutionStatus(status);
    }
}

在下面的 job 配置示例中,decision 指定了要使用的 decider 以及所有过渡:

XML Configuration
<job id="job">
    <step id="step1" parent="s1" next="decision" />

    <decision id="decision" decider="decider">
        <next on="FAILED" to="step2" />
        <next on="COMPLETED" to="step3" />
    </decision>

    <step id="step2" parent="s2" next="step3"/>
    <step id="step3" parent="s3" />
</job>

<beans:bean id="decider" class="com.MyDecider"/>

在下面的示例中,当使用 Java 配置时,实现 JobExecutionDecider 的 bean 将直接传递给 next 调用:

Java Configuration
@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
			.start(step1())
			.next(decider()).on("FAILED").to(step2())
			.from(decider()).on("COMPLETED").to(step3())
			.end()
			.build();
}

分流

到目前为止,所描述的每种情况都涉及到以线性方式一次执行一个步骤的 Job。除了这种典型风格,Spring Batch 还允许将 job 配置为并行流。

XML命名空间允许你使用 split 元素。如下面的示例所示,split 元素包含一个或多个 flow 元素,可以在其中定义整个独立的流。split 元素还可以包含前面讨论过的任何过渡元素,如 next 属性或 nextendfail 元素。

<split id="split1" next="step4">
    <flow>
        <step id="step1" parent="s1" next="step2"/>
        <step id="step2" parent="s2"/>
    </flow>
    <flow>
        <step id="step3" parent="s3"/>
    </flow>
</split>
<step id="step4" parent="s4"/>

基于 Java 的配置可让你通过提供的 builder 配置 split。如以下示例所示,split 元素包含一个或多个 flow 元素,可以在其中定义整个独立的流。split 元素还可以包含之前讨论过的任何过渡元素,如 next 属性或 nextendfail 元素。

@Bean
public Flow flow1() {
	return new FlowBuilder<SimpleFlow>("flow1")
			.start(step1())
			.next(step2())
			.build();
}

@Bean
public Flow flow2() {
	return new FlowBuilder<SimpleFlow>("flow2")
			.start(step3())
			.build();
}

@Bean
public Job job(Flow flow1, Flow flow2) {
	return this.jobBuilderFactory.get("job")
				.start(flow1)
				.split(new SimpleAsyncTaskExecutor())
				.add(flow2)
				.next(step4())
				.end()
				.build();
}

外部化 Job 之间的流程定义和依赖关系

job 中的部分流程可以外部化为单独的 bean 定义,然后重复使用。有两种方法可以做到这一点。第一种是将 flow 声明为对其他地方定义的 flow 的引用。

下面的 XML 示例显示了如何将 flow 声明为对其他地方定义的 flow 的引用:

XML Configuration
<job id="job">
    <flow id="job1.flow1" parent="flow1" next="step3"/>
    <step id="step3" parent="s3"/>
</job>

<flow id="flow1">
    <step id="step1" parent="s1" next="step2"/>
    <step id="step2" parent="s2"/>
</flow>

下面的 Java 示例展示了如何将 flow 声明为对其他地方定义的 flow 的引用:

Java Confguration
@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
				.start(flow1())
				.next(step3())
				.end()
				.build();
}

@Bean
public Flow flow1() {
	return new FlowBuilder<SimpleFlow>("flow1")
			.start(step1())
			.next(step2())
			.build();
}

如上例所示,定义外部流程的效果是将外部流程中的 step 插入到 job 中,就好像它们是内联声明的一样。通过这种方式,许多 job 可以引用相同的模板流,并将这些模板组成不同的逻辑流。这也是分离各个流的集成测试的好方法。

外部化流程的另一种形式是使用 JobStepJobStepFlowStep 类似,但实际上是为指定流程中的 step 创建并启动单独的 job 执行。

下面的示例是一个 XML 格式的 JobStep 示例:

XML Configuration
<job id="jobStepJob" restartable="true">
   <step id="jobStepJob.step1">
      <job ref="job" job-launcher="jobLauncher"
          job-parameters-extractor="jobParametersExtractor"/>
   </step>
</job>

<job id="job" restartable="true">...</job>

<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
   <property name="keys" value="input.file"/>
</bean>

下面的示例显示了 Java 中的一个 JobStep 示例:

Java Configuration
@Bean
public Job jobStepJob(JobRepository jobRepository) {
	return new JobBuilder("jobStepJob", jobRepository)
				.start(jobStepJobStep1(null))
				.build();
}

@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher, JobRepository jobRepository) {
	return new StepBuilder("jobStepJobStep1", jobRepository)
				.job(job())
				.launcher(jobLauncher)
				.parametersExtractor(jobParametersExtractor())
				.build();
}

@Bean
public Job job(JobRepository jobRepository) {
	return new JobBuilder("job", jobRepository)
				.start(step1())
				.build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
	DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

	extractor.setKeys(new String[]{"input.file"});

	return extractor;
}

job 参数提取器是一种策略,用于确定如何将 StepExecutionContext 转换为运行 JobJobParameters。当你希望对 job 和 Step 进行监控和报告时,JobStep 非常有用。使用 JobStep 通常也能很好地解决以下问题: "如何在 job 之间创建依赖关系?这是一种将大型系统分解为较小模块并控制作业流的好方法。

JobStep 属性的延迟绑定

前面显示的 XML 和平面文件示例都使用了 Spring Resource 抽象来获取文件。这是因为 Resource 有一个返回 java.io.FilegetFile 方法。你可以使用标准的 Spring 构造配置 XML 和平面文件资源:

下面的示例显示了 XML 中的延迟绑定:

XML Configuration
<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource"
              value="file://outputs/file.txt" />
</bean>

下面的示例显示了Java中的延迟绑定:

Java Configuration
@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource("file://outputs/file.txt"))
			...
}

上述 Resource 从指定的文件系统位置加载文件。请注意,绝对位置必须以双斜线(//)开头。在大多数Spring应用程序中,这种解决方案已经足够好了,因为这些资源的名称在编译时已经知道。然而,在批处理场景中,文件名可能需要在运行时作为 job 参数确定。这可以通过使用 -D 参数读取系统属性来解决。

下面的示例展示了如何从XML属性中读取文件名:

XML Configuration
<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="${input.file.name}" />
</bean>

下面展示了如何从Java属性中读取文件名:

Java Configuration
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

该解决方案只需要一个系统参数(如 -Dinput.file.name="file://outputs/file.txt")即可。

尽管你可以在这里使用 PropertyPlaceholderConfigurer,但如果 system property 总是被设置,则没有必要使用,因为 Spring 中的 ResourceEditor 已经对系 system properties 进行了过滤和占位符替换。

通常情况下,在批处理设置中,最好在 job 的 JobParameters 中参数化文件名(而不是通过系统属性)并以这种方式访问它们。为了实现这一点,Spring 批处理允许后期绑定各种 JobStep 属性。

下面的示例显示了如何在XML中参数化文件名:

XML Configuration
<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>

下面的示例展示了如何在Java中参数化文件名:

Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

你可以以同样的方式访问 JobExecutionStepExecution 级别的 ExecutionContext

下面的示例展示了如何在XML中访问 ExecutionContext

XML Configuration
<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
XML Configuration
<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>

下面的示例展示了如何在Java中访问 ExecutionContext

Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}
Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}
任何使用后期绑定的 bean 必须使用 scope="step" 声明。有关详细信息,请参阅 Step ScopeStep bean 不应具有 step scope。如果需要在 step 定义中进行后期绑定,则该 step 的组件(tasklet、item reader 或 writer 等)应被作用域化。
如果你使用的是Spring 3.0(或更高版本),step scope Bean 中的表达式使用的是 Spring 表达式语言,这是一种功能强大的通用语言,具有许多有趣的特性。为了提供向后兼容性,如果 Spring Batch 检测到旧版本的Spring 存在,它会使用功能较弱的本地表达式语言,其解析规则略有不同。主要区别在于,上例中的 map key 在 Spring 2.5 中不需要加引号,但在 Spring 3.0 中必须加引号。

Step Scope

前面显示的所有后期绑定示例都在 bean 定义中声明了 step 的 scope。

下面的示例显示了在 XML 中绑定到 step scope 的示例:

XML Configuration
<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>

下面的示例显示了在 Java 中绑定到 step scope 的示例:

Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

要使用后期绑定,就必须使用 Step 的 scope,因为在 Step 开始之前,Bean 实际上无法实例化,因此无法找到属性。因为默认情况下它不是 Spring 容器的一部分,所以必须通过使用 batch 命名空间、显式包含 StepScope 的 Bean 定义或使用 @EnableBatchProcessing 注解来显式添加 scope。只能使用其中一种方法。下面的示例使用了 batch 命名空间:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

下面的示例明确包含了 bean 定义:

<bean class="org.springframework.batch.core.scope.StepScope" />

Job Scope

在 Spring Batch 3.0 中引入的 Job scope 在配置上类似于 Step scope,但它是 Job context 的 scope,因此每个运行中的 job 只有一个这样的Bean实例。此外,通过使用 #{…​} 占位符,支持对从 JobContext 访问的引用进行后期绑定。使用此功能,你可以从 job 或 job execution context 和 job parameters 中提取 Bean 属性。

下面的示例显示了在 XML 中绑定到 job scope 的示例:

XML Configuration
<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobParameters[input]}" />
</bean>
XML Configuration
<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>

下面的示例显示了在 Java 中绑定到 job scope 的示例:

Java Configuration
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}
Java Configuration
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

由于默认情况下该 scope 不是 Spring 容器的一部分,因此必须显式添加该 scope,方法是使用 batch 命名空间、显式包含 JobScope 的 Bean 定义或使用 @EnableBatchProcessing 注解(仅选择一种方法)。下面的示例使用了 batch 命名空间:

<beans xmlns="http://www.springframework.org/schema/beans"
		  xmlns:batch="http://www.springframework.org/schema/batch"
		  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		  xsi:schemaLocation="...">

<batch:job .../>
...
</beans>

下面的示例包含一个显式定义 JobScope 的 Bean:

<bean class="org.springframework.batch.core.scope.JobScope" />
在多线程或分区 step 中使用 job scope Bean 有一些实际限制。Spring Batch 无法控制这些用例中产生的线程,因此无法正确设置这些线程以使用此类Bean。因此,我们不建议在多线程或分区 step 中使用 job scope sBean。

Scope ItemStream 组件

当使用 Java 配置样式定义 job 或 step scope 的 ItemStream Bean 时,Bean 定义方法的返回类型至少应为 ItemStream。这是必需的,这样 Spring Batch 才能正确创建一个实现该接口的代理,从而通过调用 openupdateclose 方法来履行其契约。

建议使此类 Bean 的 bean 定义方法返回最具体的已知实现,如下例所示:

Define a step-scoped bean with the most specific return type
@Bean
@StepScope
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.resource(new FileSystemResource(name))
			// set other properties of the item reader
			.build();
}