配置 Step
本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。 |
正如在 domain 章节 中所讨论的,Step
是一个 domain 对象,它封装了批处理 job 的一个独立的、连续的阶段,并包含定义和控制实际批处理的所有必要信息。这必然是一个模糊的描述,因为任何给定的 Step
的内容都是由编写 Job
的开发者决定的。一个 Step
可以是简单的,也可以是复杂的,正如开发者所希望的那样。一个简单的 Step
可能会将数据从一个文件加载到数据库中,只需要很少或不需要代码(取决于所使用的实现)。一个更复杂的 Step
可能有复杂的业务规则,作为处理的一部分被应用,如下图所示:
面向分块的处理
Spring Batch 在其最常见的实现中使用了 "面向块" 的处理风格。面向块的处理指的是每次读取数据并创建 "块" (chunk),在事务边界内写出。一旦读取的项目数等于提交间隔,整个块就会被 ItemWriter
写出来,然后事务被提交。下面的图片显示了这个过程:
下面的伪代码以一种简化的形式展示了同样的概念:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
itemWriter.write(items);
你也可以用一个可选的 ItemProcessor
来配置一个面向分块的 step,以便在将项目传递给 ItemWriter
之前对它们进行处理。下面的图片显示了当一个 ItemProcessor
被注册在 step 中时的过程:
下面的伪代码显示了如何以一种简化的形式实现这一点:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
List processedItems = new Arraylist();
for(Object item: items){
Object processedItem = itemProcessor.process(item);
if (processedItem != null) {
processedItems.add(processedItem);
}
}
itemWriter.write(processedItems);
关于 item processor 及其使用情况的更多细节,请参见 Item processing 部分。
配置 Step
尽管 Step
所需的依赖清单相对较短,但它是一个极其复杂的类,有可能包含许多合作者。
为了方便配置,你可以使用Spring Batch XML命名空间,如下例所示:
<job id="sampleJob" job-repository="jobRepository">
<step id="step1">
<tasklet transaction-manager="transactionManager">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
当使用 Java 配置时,你可以使用 Spring Batch builder,如下面的例子所示:
/**
* Note the JobRepository is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return new JobBuilder("sampleJob", jobRepository)
.start(sampleStep)
.build();
}
/**
* Note the TransactionManager is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
The preceding configuration includes the only required dependencies to create a item-oriented step:
-
reader
: TheItemReader
that provides items for processing. -
writer
: TheItemWriter
that processes the items provided by theItemReader
.
-
transaction-manager
: Spring’sPlatformTransactionManager
that begins and commits transactions during processing.
-
transactionManager
: Spring’sPlatformTransactionManager
that begins and commits transactions during processing.
-
job-repository
: The XML-specific name of theJobRepository
that periodically stores theStepExecution
andExecutionContext
during processing (just before committing). For an in-line<step/>
(one defined within a<job/>
), it is an attribute on the<job/>
element. For a standalone<step/>
, it is defined as an attribute of the<tasklet/>
.
-
repository
: The Java-specific name of theJobRepository
that periodically stores theStepExecution
andExecutionContext
during processing (just before committing).
-
commit-interval
: The XML-specific name of the number of items to be processed before the transaction is committed.
-
chunk
: The Java-specific name of the dependency that indicates that this is an item-based step and the number of items to be processed before the transaction is committed.
Note that job-repository
defaults to jobRepository
and
transaction-manager
defaults to transactionManager
. Also, the ItemProcessor
is
optional, since the item could be directly passed from the reader to the writer.
Note that repository
defaults to jobRepository
(provided through @EnableBatchProcessing
)
and transactionManager
defaults to transactionManager
(provided from the application context).
Also, the ItemProcessor
is optional, since the item could be
directly passed from the reader to the writer.
继承自 Parent Step
如果一组 Step
有类似的配置,那么定义一个 “parent” Step
可能会有帮助,具体的 Step
可以从该 Steps
继承属性。类似于Java中的类继承,“child” Step
将其元素和属性与 parent Step
相结合。child Steps
还可以覆盖 parent Step
的任何内容。
在下面的例子中,Step
,concreteStep1
,继承自 parentStep
。它被实例化为 itemReader
, itemProcessor
, itemWriter
, startLimit=5
以及 allowStartIfComplete=true
。此外, commitInterval
是 5
,因为它被 concreteStep1
Step
重写了,如下面的例子所示:
<step id="parentStep">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep1" parent="parentStep">
<tasklet start-limit="5">
<chunk processor="itemProcessor" commit-interval="5"/>
</tasklet>
</step>
job 元素中的 step 仍然需要 id
属性。这有两个原因:
-
在持久化
StepExecution
时,该id
被用作 step 名称。如果同一个独立的 step 在 job 中被多个 step 引用,就会发生错误。
-
当创建工作流时,如 本章后面 所述,
next
属性应指流程中的 step,而不是独立的 step。
Step
抽象
有时,可能需要定义一个不是完整 Step
配置的 parent Step
。例如,如果 reader
、writer
和 tasklet
属性在 Step
配置中被遗漏了,那么初始化就会失败。如果必须在没有这些属性中的一个或多个的情况下定义 parent,应该使用 abstract
属性。一个 abstract
的 Step
只能被继承,不能被实例化。
在下面的例子中,如果 Step
(abstractParentStep
)没有被声明为 abstract,它就不会被实例化。这个 Step
,(concreteStep2
)有 itemReader
, itemWriter
和 commit-interval=10
。
<step id="abstractParentStep" abstract="true">
<tasklet>
<chunk commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep2" parent="abstractParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"/>
</tasklet>
</step>
合并 List
Step
上的一些可配置元素是list,例如 <listeners/>
元素。如果 parent Step
和 child Step
都声明了一个 <listeners/>
元素,那么 child Step
的列表就会覆盖 parent Step
的列表。为了允许child在parent定义的列表中添加额外的监听器,每个 list 元素都有一个 merge
属性。如果该元素指定 merge="true"
,那么 child 的列表将与 parent 的列表合并,而不是覆盖它。
在下面的例子中,Step
"concreteStep3" 被创建,有两个 listener:listenerOne
和 listenerTwo
:
<step id="listenersParentStep" abstract="true">
<listeners>
<listener ref="listenerOne"/>
<listeners>
</step>
<step id="concreteStep3" parent="listenersParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
</tasklet>
<listeners merge="true">
<listener ref="listenerTwo"/>
<listeners>
</step>
提交间隔
如前所述,一个step读入和写出item,通过使用提供的 PlatformTransactionManager
定期提交。在 commit-interval
为 1
的情况下,它在写完每一个item后都会提交。这在很多情况下是不太理想的,因为开始和提交一个事务是很昂贵的。理想情况下,最好是在每个事务中处理尽可能多的item,这完全取决于被处理的数据类型和与之交互的资源。出于这个原因,你可以配置在一次提交中处理的item数量。
下面的例子显示了一个 step
,其 tasklet
的 commit-interval
值为10,正如它在XML中定义的那样:
<job id="sampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
下面的例子显示了一个 step
,其 tasklet
的 commit-interval
值为10,这是在Java中定义的:
@Bean
public Job sampleJob(JobRepository jobRepository) {
return new JobBuilder("sampleJob", jobRepository)
.start(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
在前面的例子中,每个事务中要处理 10 个 item。在处理的开始,一个事务就开始了。而且,每次在 ItemReader
上调用 read
时,一个计数器会被递增。当它达到10时,汇总的 item 列表被传递给 ItemWriter
,并且事务被提交。
配置重启的 Step
在 “配置和运行 Job” 一节中,讨论了重新启动 Job
的问题。重新启动对 step 有许多影响,因此,可能需要一些特定的配置。
设定启动限制
在很多情况下,你可能想控制一个 Step
的启动次数。例如,你可能需要配置一个特定的 Step
,使其只运行一次,因为它使一些资源失效,必须在再次运行前手动修复。这在 Step
层面上是可以配置的,因为不同的 Step
可能有不同的要求。一个只能执行一次的 Step
可以与一个可以无限运行的 Step
作为同一 Job
的一部分存在。
下面的代码片段显示了XML中启动限制配置的一个例子:
<step id="step1">
<tasklet start-limit="1">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
下面的代码片段显示了Java中启动限制配置的一个例子:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}
前面例子中的 step 只能运行一次。试图再次运行它将导致抛出一个 StartLimitExceededException
。注意,启动限制的默认值是 Integer.MAX_VALUE
。
重新启动一个已完成的 Step
在可重启 job 的情况下,可能有一个或多个 step 应该总是被运行,不管它们在第一次是否成功。一个例子可能是验证 Step
或在处理前清理资源的 step。在重启 job 的正常处理过程中,任何状态为 COMPLETED
(意味着它已经成功完成)的 step 都会被跳过。将 allow-start-if-complete
设置为 true
会覆盖这一点,这样该 step 就会一直运行。
下面的代码片段显示了如何在XML中定义一个可重新启动的 job:
<step id="step1">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
下面的代码片段显示了如何在Java中定义一个可重新启动的 job:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
Step
重启配置示例
下面的XML例子显示了如何配置一个 job,使其具有可重新启动的 step:
<job id="footballJob" restartable="true">
<step id="playerload" next="gameLoad">
<tasklet>
<chunk reader="playerFileItemReader" writer="playerWriter"
commit-interval="10" />
</tasklet>
</step>
<step id="gameLoad" next="playerSummarization">
<tasklet allow-start-if-complete="true">
<chunk reader="gameFileItemReader" writer="gameWriter"
commit-interval="10"/>
</tasklet>
</step>
<step id="playerSummarization">
<tasklet start-limit="2">
<chunk reader="playerSummarizationSource" writer="summaryWriter"
commit-interval="10"/>
</tasklet>
</step>
</job>
下面的Java例子显示了如何配置一个 job,使其具有可重新启动的 step:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
@Bean
public Step playerLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("playerLoad", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(playerFileItemReader())
.writer(playerWriter())
.build();
}
@Bean
public Step gameLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("gameLoad", jobRepository)
.allowStartIfComplete(true)
.<String, String>chunk(10, transactionManager)
.reader(gameFileItemReader())
.writer(gameWriter())
.build();
}
@Bean
public Step playerSummarization(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("playerSummarization", jobRepository)
.startLimit(2)
.<String, String>chunk(10, transactionManager)
.reader(playerSummarizationSource())
.writer(summaryWriter())
.build();
}
前面的配置示例是为一个加载足球比赛信息并总结的工作。它包含三个 step:playerLoad
, gameLoad
和 playerSummarization
。playerLoad
step 从一个平面文件中加载球员信息,而 gameLoad
step 对比赛也做同样的工作。最后一个 step,playerSummarization
,然后根据提供的游戏总结每个玩家的统计数据。假设由 playerLoad
加载的文件必须只加载一次,但 gameLoad
可以加载在特定目录中发现的任何游戏,在它们被成功加载到数据库后删除它们。因此,playerLoad
step 不包含额外的配置。它可以被启动任何次数,如果完成则跳过。然而,gameLoad
step 需要每次都运行,以防在它上次运行后有额外的文件被添加。它的允 allow-start-if-complete
设置为 true
,以便总是被启动。(假设游戏加载的数据库表上有一个进程指示器,以确保新游戏能被总结 step 正确找到)。summarization step 是工作中最重要的,被配置为启动限制为2。这很有用,因为如果该 step 持续失败,就会向控制 job execution 的操作人员返回一个新的退出代码,在人工干预之前,它不能再次启动。
这个 job 为本文档提供了一个例子,与示例项目中的 footballJob 不同。
|
本节的其余部分描述了 footballJob
例子的三次运行中每一次发生的情况。
运行 1:
-
playerLoad
运行并成功完成,向PLAYERS
表添加了400个玩家。 -
gameLoad
运行并处理价值11个文件的游戏数据,将其内容加载到GAMES
表中。 -
playerSummarization
开始处理,5分钟后失败。
运行 2:
-
playerLoad
不运行,因为它已经成功完成,并且allow-start-if-complete
是false
(默认)。 -
gameLoad
再次运行并处理另外两个文件,将它们的内容也加载到GAMES
表中(有一个 process 指示器表明它们还没有被处理)。 -
playerSummarization
开始处理所有剩余的游戏数据(使用 process 指标进行过滤),30分钟后再次失败。
运行 3:
-
playerLoad
不运行,因为它已经成功完成,并且allow-start-if-complete
是false
(默认)。 -
gameLoad
再次运行并处理另外两个文件,将它们的内容也加载到GAMES
表中(有一个 process 指示器表明它们还没有被处理)。 -
playerSummarization
没有被启动,job 被立即杀死,因为这是playerSummarization
的第三次执行,而它的限制只有2
,要么必须提高限制,要么Job
必须作为一个新的JobInstance
被执行。
配置跳过(Skip )逻辑
在很多情况下,处理过程中遇到的错误不应该导致 Step
失败,而是应该跳过。这通常是一个必须由了解数据本身和它的意义的人做出的决定。例如,财务数据可能不能跳过,因为它导致了金钱的转移,这需要完全准确。另一方面,加载一个供应商的列表,可能允许跳过。如果一个供应商因为格式不正确或缺少必要的信息而没有被加载,可能就没有问题了。通常,这些不好的记录也会被记录下来,这在后面讨论 listener 的时候会涉及。
下面的XML例子显示了一个使用跳过限制的例子:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch.item.file.FlatFileParseException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
下面的Java例子显示了一个使用跳过限制的例子:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
在前面的例子中,使用了一个 FlatFileItemReader
。如果在任何时候抛出一个 FlatFileParseException
,这个 item 就会被跳过,并被计入总的跳过限制(10)。被声明的异常(和它们的子类)可能会在块处理的任何阶段(读取、处理或写入)被抛出。在步骤执行过程中,对读、处理和写的跳过进行单独统计,但该限制适用于所有跳过。一旦达到跳过限制,发现的下一个异常将导致该步骤失败。换句话说,第11次跳过会触发异常,而不是第10次。
前面的例子的一个问题是,除了 FlatFileParseException
之外的任何其他异常都会导致 Job
失败。在某些情况下,这可能是正确的行为。然而,在其他情况下,识别哪些异常应该导致失败,并跳过其他一切,可能会更容易。
下面的XML例子显示了一个排除特定异常的例子:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="java.lang.Exception"/>
<exclude class="java.io.FileNotFoundException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
下面的Java例子显示了一个排除特定异常的例子:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
通过将 java.lang.Exception
确定为可跳过的异常类,该配置表明所有的 Exception
都是可跳过的。然而,通过 "排除" java.io.FileNotFoundException
,配置将可跳过的异常类列表细化为除了 FileNotFoundException
之外的所有 Exception
。任何被排除的异常类在遇到时都是致命的(也就是说,它们不会被跳过)。
对于遇到的任何异常,其可跳过性是由类层次结构中最近的超类决定的。任何未分类的异常都被当作 'fatal' 处理。
The order of the <include/>
and <exclude/>
elements does not matter.
The order of the skip
and noSkip
method calls does not matter.
配置重试逻辑
在大多数情况下,你希望一个异常要么导致跳过,要么导致一个 Step
失败。然而,并不是所有的异常都是确定性的。如果在读取过程中遇到了 FlatFileParseException
,它总是会被抛出用于该记录。重置 ItemReader
并没有帮助。然而,对于其他的异常(比如 DeadlockLoserDataAccessException
,它表明当前进程试图更新另一个进程拥有锁的记录),等待并再次尝试可能会导致成功。
在XML中,重试应按以下方式进行配置:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"
commit-interval="2" retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
在Java中,重试应按以下方式进行配置:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
该 Step
允许对单个 item 的重试次数进行限制,以及列出 "可重试" 的异常情况。你可以在 retry 中找到更多关于重试工作的细节。
Controlling Rollback
默认情况下,无论重试还是跳过,从 ItemWriter
抛出的任何异常都会导致由该 Step
控制的事务回滚。如果跳过配置如前所述,从 ItemReader
抛出的异常不会导致回滚。然而,在许多情况下,从 ItemWriter
抛出的异常不应该导致回滚,因为没有发生任何行动来使事务无效。出于这个原因,你可以在 Step
中配置一个不应该导致回滚的异常列表。
在XML中,你可以按以下方式控制回滚:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<no-rollback-exception-classes>
<include class="org.springframework.batch.item.validator.ValidationException"/>
</no-rollback-exception-classes>
</tasklet>
</step>
在Java中,你可以按以下方式控制回滚:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class)
.build();
}
事务 Reader
ItemReader
的基本契约是,它是只向前的。该 step 缓冲了 reader 的输入,因此,在回滚的情况下,item 不需要从 reader 中重新读取。然而,在某些情况下,reader 是建立在一个事务性资源之上的,比如JMS队列。在这种情况下,由于队列与被回滚的事务相联系,已经从队列中拉出的消息会被放回。出于这个原因,你可以配置该 step,使其不缓冲这些 item。
下面的例子显示了如何在xml中创建一个不缓冲 item 的 reader:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"
is-reader-transactional-queue="true"/>
</tasklet>
</step>
下面的例子显示了如何在Java中创建一个不缓冲 item 的 reader:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue()
.build();
}
事务属性
你可以使用事务属性来控制 isolation
、propagation
和 timeout
设置。你可以在 Spring核心文档 中找到更多关于设置事务属性的信息。
下面的例子在XML中设置 isolation
、propagation
和 timeout
事务属性:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<transaction-attributes isolation="DEFAULT"
propagation="REQUIRED"
timeout="30"/>
</tasklet>
</step>
下面的例子在Java中设置 isolation
、propagation
和 timeout
事务属性:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute)
.build();
}
用 Step
注册 ItemStream
该 step 必须在其生命周期的必要点上处理 ItemStream
的回调。(关于 ItemStream
接口的更多信息,见 ItemStream)。如果一个 step 失败了,可能需要重新启动,这是至关重要的,因为 ItemStream
接口是 step 获得它需要的关于 execution 之间的持久状态的信息的地方。
如果 ItemReader
、ItemProcessor
或 ItemWriter
本身实现了 ItemStream
接口,这些将被自动注册。任何其他的流都需要单独注册。这通常是间接依赖的情况,如委托(delegates),被注入到 reader 和 writer 中。你可以通过 stream
元素在 step
中注册一个流。
下面的例子显示了如何在XML中的 step
上注册一个 stream
:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
<streams>
<stream ref="fileItemWriter1"/>
<stream ref="fileItemWriter2"/>
</streams>
</chunk>
</tasklet>
</step>
<beans:bean id="compositeWriter"
class="org.springframework.batch.item.support.CompositeItemWriter">
<beans:property name="delegates">
<beans:list>
<beans:ref bean="fileItemWriter1" />
<beans:ref bean="fileItemWriter2" />
</beans:list>
</beans:property>
</beans:bean>
下面的例子显示了如何在Java中的 step
上注册一个 stream
:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1())
.stream(fileItemWriter2())
.build();
}
/**
* In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
* necessary, but used for an example.
*/
@Bean
public CompositeItemWriter compositeItemWriter() {
List<ItemWriter> writers = new ArrayList<>(2);
writers.add(fileItemWriter1());
writers.add(fileItemWriter2());
CompositeItemWriter itemWriter = new CompositeItemWriter();
itemWriter.setDelegates(writers);
return itemWriter;
}
在前面的例子中,CompositeItemWriter
不是一个 ItemStream
,但它的两个委托(delegate)都是。因此,这两个委托 writer 必须明确地注册为 stream,以便框架正确处理它们。ItemReader
不需要明确地注册为 stream,因为它是 Step
的直接属性。该 step 现在是可重启的,并且在失败的情况下,reader 和 writer 的状态被正确地持久化。
拦截 Step
的执行
就像 Job
一样,在 Step
的执行过程中,有许多事件,用户可能需要执行一些功能。例如,为了写出一个需要页脚的平面文件,ItemWriter
需要在 Step
完成后得到通知,以便可以写出页脚。这可以通过许多 Step
scope 内的监听器(listeners)来实现。
你可以通过 listeners
元素将任何实现 StepListener
的一个扩展的类(但不是那个接口本身,因为它是空的)应用到一个step。listeners
元素在 step、tasklet 或 chunk 的声明中是有效的。我们建议你在其功能适用的级别上声明 listeners,如果它是多功能的(如 StepExecutionListener
和 ItemReadListener
),则在其适用的最细的级别上声明。
下面的例子显示了在 XML 的 chunk level 上应用的 listener:
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
下面的例子显示了在 Java 的 chunk level 上应用的 listener:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
如果使用命名空间 <step>
元素或 *StepFactoryBean
工厂之一,本身实现了 StepListener
接口之一的 ItemReader
、ItemWriter
或 ItemProcessor
会自动注册到 Step
中。这只适用于直接注入到 Step
中的组件。如果 listener 被嵌套在另一个组件中,你需要显式地注册它(如之前在 用 Step
注册 ItemStream
下所描述的)。
除了 StepListener
接口之外,还提供了注解来解决同样的问题。普通的Java对象可以有带有这些注解的方法,然后被转换为相应的 StepListener
类型。对大块组件的自定义实现进行注解也很常见,如 ItemReader
或 ItemWriter
或 Tasklet
。这些注解被XML解析器分析为 <listener/>
元素,以及与 builder 中的 listener
方法一起注册,所以你所需要做的就是使用 XML 命名空间或 builder 将 listener
与 step 一起注册。
StepExecutionListener
StepExecutionListener
代表了 Step
执行的最通用的 listener。它允许在一个 Step
开始之前和结束之后进行通知,无论它是正常结束还是失败,正如下面的例子所示:
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
ExitStatus
有一个 afterStep
的返回类型,以使 listener 有机会修改在完成一个 Step
后返回的退出代码。
与此接口相对应的注释是:
-
@BeforeStep
-
@AfterStep
ChunkListener
一个 “chunk” 被定义为在一个事务范围内处理的 item。在每个提交间隔中,提交一个事务,就提交了一个chunk。你可以使用 ChunkListener
在一个 chunk 开始处理之前或在一个 chunk 成功完成之后执行逻辑,如下面的接口定义所示:
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
beforeChunk
方法是在事务开始后,但在 ItemReader
上开始读取之前被调用。反之,afterChunk
方法是在事务提交后被调用的(如果有回滚,则根本不调用)。
与该接口相对应的注解是:
-
@BeforeChunk
-
@AfterChunk
-
@AfterChunkError
你可以在没有 chunk 声明时应用 ChunkListener
。TaskletStep
负责调用 ChunkListener
,所以它也适用于非面向 item 的 tasklet(它在 tasklet 之前和之后被调用)。
ItemReadListener
在之前讨论跳过逻辑的时候,有人提到记录跳过的记录可能是有益的,这样可以在以后处理。在读取错误的情况下,这可以通过 ItemReaderListener
来完成,正如下面的接口定义所示:
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
beforeRead
方法在每次调用 ItemReader
进行读取之前被调用。afterRead
方法在每次成功调用读取后被调用,并传递给被读取的 item。如果在读取时出现了错误,就会调用 onReadError
方法。遇到的异常会被提供,这样就可以被记录下来。
与该接口相对应的注解是:
-
@BeforeRead
-
@AfterRead
-
@OnReadError
ItemProcessListener
与 ItemReadListener
一样,一个 item 的处理可以被 "监听",正如下面的接口定义所示:
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
beforeProcess
方法在 ItemProcessor
的 process
之前被调用,并被交给要被处理的 item。 afterProcess
方法在 item 被成功处理后被调用。如果在处理过程中出现了错误,就会调用 onProcessError
方法。遇到的异常和试图被处理的 item 将被提供,这样就可以被记录下来。
与该接口相对应的注解是:
-
@BeforeProcess
-
@AfterProcess
-
@OnProcessError
ItemWriteListener
你可以用 ItemWriteListener
来 "监听" 一个 item 的写入,正如下面的接口定义所示:
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
beforeWrite
方法在 ItemWriter
上 write
之前被调用,并被交给要写入的 item 列表。afterWrite
方法在 item 被成功写入后被调用。如果在写的时候出现了错误,onWriteError
方法会被调用。遇到的异常和试图被写入的 item 将被提供,这样它们就可以被记录下来。
与该接口相对应的注解是:
-
@BeforeWrite
-
@AfterWrite
-
@OnWriteError
SkipListener
ItemReadListener
、ItemProcessListener
和 ItemWriteListener
都提供了被通知错误的机制,但没有一个能通知你一个记录确实被跳过了。 例如,onWriteError
,即使一个 item 被重试并成功,也会被调用。由于这个原因,有一个单独的接口来跟踪被跳过的 item,正如下面的接口定义所示:
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
onSkipInRead
当一个 item 在读取时被跳过时被调用。应该注意的是,回滚可能会导致同一个 item 被多次注册为跳过。onSkipInWrite
是在一个 item 被写入时跳过时调用的。因为该 item 已经被成功读取(并且没有被跳过),它也被提供 item 本身作为参数。
与该接口相对应的注解是:
-
@OnSkipInRead
-
@OnSkipInWrite
-
@OnSkipInProcess
TaskletStep
面向块的处理 不是在 Step
中处理的唯一方式。如果一个 Step
必须包括一个存储过程的调用呢?你可以将调用实现为一个 ItemReader
,并在过程结束后返回 null
。然而,这样做有点不自然,因为需要有一个无操作的 ItemWriter
。Spring Batch 为这种情况提供了 TaskletStep
。
Tasklet
接口有一个方法,execute
,它被 TaskletStep
反复调用,直到它返回 RepeatStatus.FINISHED
或抛出一个异常以示失败。对 Tasklet
的每个调用都被封装在一个事务中。Tasklet
实现者可能调用一个存储过程、一个脚本或一个SQL更新语句。
To create a TaskletStep
in XML, the ref
attribute of the <tasklet/>
element should
reference a bean that defines a Tasklet
object. No <chunk/>
element should be used
within the <tasklet/>
. The following example shows a simple tasklet:
<step id="step1">
<tasklet ref="myTasklet"/>
</step>
To create a TaskletStep
in Java, the bean passed to the tasklet
method of the builder
should implement the Tasklet
interface. No call to chunk
should be called when
building a TaskletStep
. The following example shows a simple tasklet:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.tasklet(myTasklet(), transactionManager)
.build();
}
如果它实现了 StepListener 接口,TaskletStep 会自动将该 tasklet 注册为一个 StepListener 。
|
TaskletAdapter
与 ItemReader
和 ItemWriter
接口的其他适配器一样,Tasklet
接口包含一个实现,允许将自己适应于任何预先存在的类: TaskletAdapter
。一个可能有用的例子是一个现有的 DAO,它被用来更新一组记录上的一个标志。你可以使用 TaskletAdapter
来调用这个类,而不需要为 Tasklet
接口写一个适配器。
下面的例子显示了如何在 XML 中定义一个 TaskletAdapter
:
<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
<property name="targetObject">
<bean class="org.mycompany.FooDao"/>
</property>
<property name="targetMethod" value="updateFoo" />
</bean>
下面的例子显示了如何在Java中定义一个 TaskletAdapter
:
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(fooDao());
adapter.setTargetMethod("updateFoo");
return adapter;
}
Tasklet
实现示例
许多批处理作业包含在主要处理开始前必须完成的step,以设置各种资源或在处理完成后清理这些资源。在大量处理文件的作业中,往往需要在某些文件被成功上传到另一个地方后在本地删除这些文件。下面的例子(取自 Spring Batch samples project)是一个具有这样职责的 Tasklet
实现:
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory());
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.state(directory != null, "directory must be set");
}
}
前面的 tasklet
实现删除了一个给定目录中的所有文件。应该注意的是, execute
方法只被调用一次。剩下的就是引用 step
中的 tasklet
。
下面的例子显示了如何在 XML 中引用 step
中的 tasklet
:
<job id="taskletJob">
<step id="deleteFilesInDir">
<tasklet ref="fileDeletingTasklet"/>
</step>
</job>
<beans:bean id="fileDeletingTasklet"
class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
<beans:property name="directoryResource">
<beans:bean id="directory"
class="org.springframework.core.io.FileSystemResource">
<beans:constructor-arg value="target/test-outputs/test-dir" />
</beans:bean>
</beans:property>
</beans:bean>
下面的例子显示了如何在 Java 中引用 step
中的 tasklet
:
@Bean
public Job taskletJob(JobRepository jobRepository) {
return new JobBuilder("taskletJob", jobRepository)
.start(deleteFilesInDir())
.build();
}
@Bean
public Step deleteFilesInDir(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("deleteFilesInDir", jobRepository)
.tasklet(fileDeletingTasklet(), transactionManager)
.build();
}
@Bean
public FileDeletingTasklet fileDeletingTasklet() {
FileDeletingTasklet tasklet = new FileDeletingTasklet();
tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));
return tasklet;
}
控制 Step 流
随着在一个自有 job 内将 step 分组的能力,需要能够控制作业如何从一个 Step
"流向" 另一个 Step
。一个 Step
的失败并不一定意味着 Job
应该失败。此外,可能有不止一种类型的 "成功" 来决定下一步应该执行哪个 Step
。根据一组 Step
的配置方式,某些 Step
甚至可能根本就不被处理。
顺序流
最简单的流程场景是一个所有 step 都按顺序执行的工作,如下图所示:
这可以通过在一个 step
中使用 next
来实现。
下面的例子显示了如何在 XML 中使用 next
属性:
<job id="job">
<step id="stepA" parent="s1" next="stepB" />
<step id="stepB" parent="s2" next="stepC"/>
<step id="stepC" parent="s3" />
</job>
下面的例子显示了如何在 Java 中使用 next()
方法:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.next(stepB())
.next(stepC())
.build();
}
在上述情况下,stepA
首先运行,因为它是列出的第一个 Step
。如果 stepA
正常完成,stepB
运行,以此类推。但是,如果 step A
失败,整个 Job
就会失败,stepB
就不会执行。
对于 Spring Batch XML 命名空间,配置中列出的第一个 step 始终是 Job 运行的第一个 step。其他 step 元素的顺序并不重要,但第一个 step 必须总是出现在 XML 的首位。
|
条件流
在前面的例子中,只有两种可能性:
-
该
step
成功,应执行下一step
。 -
该
step
失败,因此job
也应失败。
在许多情况下,这就足够了。但是,如果一个 step
失败后,应该触发另一个 step
,而不是导致失败,这种情况又该如何处理呢?下图显示了这种流程:
为了处理更复杂的情况,Spring Batch XML 命名空间允许你在 step 元素中定义过渡元素。next
元素就是这样一个过渡元素。与 next
属性一样,next
元素告诉 Job
下一步要执行哪个 Step
。然而,与属性不同的是,在给定 Step
上允许使用任意数量的 next
元素,并且在失败情况下没有默认行为。这意味着,如果使用过渡元素,则必须明确定义 Step
过渡的所有行为。还请注意,一个 Step
不能同时具有 next
属性和过渡元素。
next
元素指定了要匹配的模式和下一步要执行的 step,如下例所示:
<job id="job">
<step id="stepA" parent="s1">
<next on="*" to="stepB" />
<next on="FAILED" to="stepC" />
</step>
<step id="stepB" parent="s2" next="stepC" />
<step id="stepC" parent="s3" />
</job>
Java API 提供了一组 fluent 方法,可让你指定流程以及当某一步失败时该如何处理。下面的示例展示了如何指定一个 step(stepA
),然后根据 stepA 是否成功进入两个不同步骤(stepB
或 stepC
)中的任一 step:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.on("*").to(stepB())
.from(stepA()).on("FAILED").to(stepC())
.end()
.build();
}
当使用 XML 配置时,过渡元素的 on
属性使用简单的模式匹配方案来匹配步骤执行后的 ExitStatus
。
当使用 java 配置时,on()
方法使用简单的模式匹配方案来匹配 Step
执行后的 ExitStatus
。
模式中只允许使用两个特殊字符:
-
*
匹配0个或多个字符。 -
?
完全匹配一个字符。
例如,c*t
匹配 cat
和 count
,而 c?t
匹配 cat
但不匹配 count
。
虽然对一个 Step
上的过渡元素的数量没有限制,但如果 Step
执行的结果是一个元素没有覆盖的 ExitStatus
,框架就会抛出一个异常,Job
就会失败。框架会自动将过渡元素从最特殊到最不特殊排序。这意味着,即使在前面的示例中将 stepA
的排序对调,FAILED
的 ExitStatus
仍然会进入 stepC
。
批处理状态与退出状态
在为条件流配置 Job
时,了解 BatchStatus
和 ExitStatus
之间的区别非常重要。BatchStatus
是一个枚举,是 JobExecution
和 StepExecution
的属性,框架使用它来记录 Job
或 Step
的状态。它可以是以下值之一: Completed
(完成)、STARTING
(开始)、STARTED
(已开始)、STOPPING
(停止)、STOPPED
(已停止)、FAILED
(失败)、ABANDONED
(放弃)或 UNKNOWN
(未知)。其中大部分不言自明: COMPLETED
(完成)是 step 或 job 成功完成时设置的状态,FAILED
(失败)是失败时设置的状态,以此类推。
下面的示例包含了使用XML配置时的 next
元素:
<next on="FAILED" to="stepB" />
下面的示例包含了使用Java配置时的 on
元素:
...
.from(stepA()).on("FAILED").to(stepB())
...
乍看起来,on
引用了所属 Step
的 BatchStatus
。然而,实际上它引用的是 Step
的 ExitStatus
。顾名思义,ExitStatus
表示 Step
执行结束后的状态。
更具体地说,当使用XML配置时,前面的XML配置示例中显示的 next
元素引用 ExitStatus
的退出代码。
当使用 Java 配置时,前面 Java 配置示例中的 on()
方法引用 ExitStatus
的退出代码。
用中文说就是:“如果退出代码为FAILED,则转到步骤B”。默认情况下,退出代码总是与该 Step
的 BatchStatus
相同,这就是前面的条目能够工作的原因。但是,如果退出代码需要不同呢?示例项目中的跳过示例 job 就是一个很好的例子:
下面的示例展示了如何在 XML 中使用不同的退出代码:
<step id="step1" parent="s1">
<end on="FAILED" />
<next on="COMPLETED WITH SKIPS" to="errorPrint1" />
<next on="*" to="step2" />
</step>
下面的示例展示了如何在 Java 中使用不同的退出代码:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("FAILED").end()
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
.from(step1()).on("*").to(step2())
.end()
.build();
}
step1
有三种可能:
-
Step
失败,在这种情况下,job 应该失败。 -
Step
成功完成。 -
Step
成功完成,但退出代码为COMPLETED WITH SKIPS
。在这种情况下,应该运行另一个 step 来处理错误。
上述配置有效。但是,正如下面的示例所示,需要根据跳过记录的执行条件更改退出代码:
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
前面的代码是一个 StepExecutionListener
,它首先检查 Step
是否成功,然后检查 StepExecution
的跳过计数是否大于 0。 如果两个条件都满足,将返回一个新的 ExitStatus
,其退出代码为 COMPLETED WITH SKIPS
。
配置停止
在讨论了 BatchStatus
和 ExitStatus
之后,人们可能想知道如何确定 Job
的 BatchStatus
和 ExitStatus
。 Step
的这些状态由执行的代码决定,而 Job
的状态则根据配置决定。
到目前为止,讨论的所有 job 配置都至少有一个没有过渡的最后 Step
。
在下面的 XML 示例中,step
执行后,Job
结束:
<step id="stepC" parent="s3"/>
在下面的 Java 示例中,step
执行后,Job
结束:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
如果没有为 Step
定义过渡,则 Job
的状态定义如下:
-
如果
Step
结束时的ExitStatus
为FAILED
,则Job
的BatchStatus
和ExitStatus
均为FAILED
。 -
否则,
Job
的BatchStatus
(批处理状态)和ExitStatus
(退出状态)都是COMPLETED
(已完成)。
虽然这种终止批处理 job 的方法对于某些批处理 Job(如简单的顺序 step job)来说已经足够,但可能还需要自定义的 job 停止场景。为此,Spring Batch 提供了三个过渡元素来停止 Job
(除了我们之前讨论过的 next
元素)。每个停止元素都以特定的 BatchStatus
停止 Job
。需要注意的是,停止过渡元素对 Job
中任何 Step
的 BatchStatus
或 ExitStatus
都没有影响。这些元素只影响 Job
的最终状态。例如,Job
中的每个 Step
的状态都可能是 FAILED
(失败),但 job 的状态却是 COMPLETED
(完成)。
以 Step
结束
配置 step 结束指示 Job
在 BatchStatus
为 COMPLETED
时停止。完成状态为 COMPLETED
的 Job
无法重新启动(框架会抛出 JobInstanceAlreadyCompleteException
)。
当使用 XML 配置时,你可以使用 end
元素来完成此任务。end
元素还允许一个可选的 exit-code
属性,你可以使用该属性自定义 Job
的 ExitStatus
。如果没有给出 exit-code
属性,则 ExitStatus
默认为 COMPLETED
,以与 BatchStatus
匹配。
使用 Java 配置时,该任务使用 end
方法。end
方法还允许一个可选的 exitStatus
参数,你可以使用该参数自定义 Job
的 ExitStatus
。如果没有提供 exitStatus
值,则 ExitStatus
默认为 COMPLETED
,以匹配 BatchStatus
。
考虑以下情况:如果 step2
失败,则 Job
停止,BatchStatus
为 COMPLETED
,ExitStatus
为 COMPLETED
,step3
不运行。否则,执行到 step3
。请注意,如果 step2
失败,该 Job
不可重新启动(因为状态是 COMPLETED
)。
下面的示例显示了 XML 中的场景:
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<end on="FAILED"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
下面的示例展示了Java中的场景:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2())
.on("FAILED").end()
.from(step2()).on("*").to(step3())
.end()
.build();
}
失败 Step
将 step 配置为在给定点失败,可指示 Job
以 FAILED
的 BatchStatus
停止。与结束不同的是,Job
的失败不会阻止 Job
的重新启动。
使用 XML 配置时,fail
元素还允许可选的 exit-code
属性,该属性可用于自定义 Job
的 ExitStatus
。如果没有给出 exit-code
属性,则 ExitStatus
默认为 FAILED
,以匹配 BatchStatus
。
考虑以下情况:如果 step2
失败,则 Job
停止,BatchStatus
为 FAILED
,ExitStatus
为 EARLY TERMINATION
,step3
不执行。否则,将执行 step3
。此外,如果 step2
失败并且重新启动了 Job
,执行将从 step2
重新开始。
下面的示例显示了 XML 中的场景:
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<fail on="FAILED" exit-code="EARLY TERMINATION"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
下面的示例显示了Java中的场景:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2()).on("FAILED").fail()
.from(step2()).on("*").to(step3())
.end()
.build();
}
在指定 Step 停止 Job
将 job 配置为在特定 step 停止,可指示 Job
以 STOPPED
(停止)的 BatchStatus
停止。停止 Job
可以暂时中断处理,以便运维在重新启动 Job
前采取一些措施。
当使用 XML 配置时,stop
元素需要一个 restart
属性,该属性指定当重新启动 Job
时应重新开始执行的 step。
当使用 Java 配置时,stopAndRestart
方法需要一个 restart
属性,该属性指定了重新启动 Job
时应重新开始执行的 step。
请考虑以下情况:如果 step1
以 COMPLETE
结束,则 job 停止。重新启动后,开始执行 step2
。
以下列表显示了 XML 中的场景:
<step id="step1" parent="s1">
<stop on="COMPLETED" restart="step2"/>
</step>
<step id="step2" parent="s2"/>
下面的示例显示了Java中的场景:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("COMPLETED").stopAndRestart(step2())
.end()
.build();
}
编程式流程决策
在某些情况下,可能需要比 ExitStatus
更多的信息来决定下一步执行哪个 step。在这种情况下,可以使用 JobExecutionDecider
来帮助做出决定,如下例所示:
public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
在下面的 job 配置示例中,decision
指定了要使用的 decider 以及所有过渡:
<job id="job">
<step id="step1" parent="s1" next="decision" />
<decision id="decision" decider="decider">
<next on="FAILED" to="step2" />
<next on="COMPLETED" to="step3" />
</decision>
<step id="step2" parent="s2" next="step3"/>
<step id="step3" parent="s3" />
</job>
<beans:bean id="decider" class="com.MyDecider"/>
在下面的示例中,当使用 Java 配置时,实现 JobExecutionDecider
的 bean 将直接传递给 next
调用:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
分流
到目前为止,所描述的每种情况都涉及到以线性方式一次执行一个步骤的 Job
。除了这种典型风格,Spring Batch 还允许将 job 配置为并行流。
XML命名空间允许你使用 split
元素。如下面的示例所示,split
元素包含一个或多个 flow
元素,可以在其中定义整个独立的流。split
元素还可以包含前面讨论过的任何过渡元素,如 next
属性或 next
、end
或 fail
元素。
<split id="split1" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
基于 Java 的配置可让你通过提供的 builder 配置 split
。如以下示例所示,split
元素包含一个或多个 flow
元素,可以在其中定义整个独立的流。split
元素还可以包含之前讨论过的任何过渡元素,如 next
属性或 next
、end
或 fail
元素。
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public Job job(Flow flow1, Flow flow2) {
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4())
.end()
.build();
}
外部化 Job 之间的流程定义和依赖关系
job 中的部分流程可以外部化为单独的 bean 定义,然后重复使用。有两种方法可以做到这一点。第一种是将 flow 声明为对其他地方定义的 flow 的引用。
下面的 XML 示例显示了如何将 flow 声明为对其他地方定义的 flow 的引用:
<job id="job">
<flow id="job1.flow1" parent="flow1" next="step3"/>
<step id="step3" parent="s3"/>
</job>
<flow id="flow1">
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
下面的 Java 示例展示了如何将 flow 声明为对其他地方定义的 flow 的引用:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(flow1())
.next(step3())
.end()
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
如上例所示,定义外部流程的效果是将外部流程中的 step 插入到 job 中,就好像它们是内联声明的一样。通过这种方式,许多 job 可以引用相同的模板流,并将这些模板组成不同的逻辑流。这也是分离各个流的集成测试的好方法。
外部化流程的另一种形式是使用 JobStep
。JobStep
与 FlowStep
类似,但实际上是为指定流程中的 step 创建并启动单独的 job 执行。
下面的示例是一个 XML 格式的 JobStep
示例:
<job id="jobStepJob" restartable="true">
<step id="jobStepJob.step1">
<job ref="job" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor"/>
</step>
</job>
<job id="job" restartable="true">...</job>
<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
<property name="keys" value="input.file"/>
</bean>
下面的示例显示了 Java 中的一个 JobStep
示例:
@Bean
public Job jobStepJob(JobRepository jobRepository) {
return new JobBuilder("jobStepJob", jobRepository)
.start(jobStepJobStep1(null))
.build();
}
@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher, JobRepository jobRepository) {
return new StepBuilder("jobStepJobStep1", jobRepository)
.job(job())
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor())
.build();
}
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
extractor.setKeys(new String[]{"input.file"});
return extractor;
}
job 参数提取器是一种策略,用于确定如何将 Step
的 ExecutionContext
转换为运行 Job
的 JobParameters
。当你希望对 job 和 Step
进行监控和报告时,JobStep
非常有用。使用 JobStep
通常也能很好地解决以下问题: "如何在 job 之间创建依赖关系?这是一种将大型系统分解为较小模块并控制作业流的好方法。
Job
和 Step
属性的延迟绑定
前面显示的 XML 和平面文件示例都使用了 Spring Resource
抽象来获取文件。这是因为 Resource
有一个返回 java.io.File
的 getFile
方法。你可以使用标准的 Spring 构造配置 XML 和平面文件资源:
下面的示例显示了 XML 中的延迟绑定:
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource"
value="file://outputs/file.txt" />
</bean>
下面的示例显示了Java中的延迟绑定:
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource("file://outputs/file.txt"))
...
}
上述 Resource
从指定的文件系统位置加载文件。请注意,绝对位置必须以双斜线(//
)开头。在大多数Spring应用程序中,这种解决方案已经足够好了,因为这些资源的名称在编译时已经知道。然而,在批处理场景中,文件名可能需要在运行时作为 job 参数确定。这可以通过使用 -D
参数读取系统属性来解决。
下面的示例展示了如何从XML属性中读取文件名:
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="${input.file.name}" />
</bean>
下面展示了如何从Java属性中读取文件名:
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
该解决方案只需要一个系统参数(如 -Dinput.file.name="file://outputs/file.txt"
)即可。
尽管你可以在这里使用 PropertyPlaceholderConfigurer ,但如果 system property 总是被设置,则没有必要使用,因为 Spring 中的 ResourceEditor 已经对系 system properties 进行了过滤和占位符替换。
|
通常情况下,在批处理设置中,最好在 job 的 JobParameters
中参数化文件名(而不是通过系统属性)并以这种方式访问它们。为了实现这一点,Spring 批处理允许后期绑定各种 Job
和 Step
属性。
下面的示例显示了如何在XML中参数化文件名:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>
下面的示例展示了如何在Java中参数化文件名:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
你可以以同样的方式访问 JobExecution
和 StepExecution
级别的 ExecutionContext
。
下面的示例展示了如何在XML中访问 ExecutionContext
:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>
下面的示例展示了如何在Java中访问 ExecutionContext
:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
任何使用后期绑定的 bean 必须使用 scope="step" 声明。有关详细信息,请参阅 Step Scope。Step bean 不应具有 step scope。如果需要在 step 定义中进行后期绑定,则该 step 的组件(tasklet、item reader 或 writer 等)应被作用域化。
|
如果你使用的是Spring 3.0(或更高版本),step scope Bean 中的表达式使用的是 Spring 表达式语言,这是一种功能强大的通用语言,具有许多有趣的特性。为了提供向后兼容性,如果 Spring Batch 检测到旧版本的Spring 存在,它会使用功能较弱的本地表达式语言,其解析规则略有不同。主要区别在于,上例中的 map key 在 Spring 2.5 中不需要加引号,但在 Spring 3.0 中必须加引号。 |
Step Scope
前面显示的所有后期绑定示例都在 bean 定义中声明了 step
的 scope。
下面的示例显示了在 XML 中绑定到 step
scope 的示例:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>
下面的示例显示了在 Java 中绑定到 step scope 的示例:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
要使用后期绑定,就必须使用 Step
的 scope,因为在 Step
开始之前,Bean 实际上无法实例化,因此无法找到属性。因为默认情况下它不是 Spring 容器的一部分,所以必须通过使用 batch
命名空间、显式包含 StepScope
的 Bean 定义或使用 @EnableBatchProcessing
注解来显式添加 scope。只能使用其中一种方法。下面的示例使用了 batch
命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
下面的示例明确包含了 bean 定义:
<bean class="org.springframework.batch.core.scope.StepScope" />
Job Scope
在 Spring Batch 3.0 中引入的 Job
scope 在配置上类似于 Step
scope,但它是 Job
context 的 scope,因此每个运行中的 job 只有一个这样的Bean实例。此外,通过使用 #{…}
占位符,支持对从 JobContext
访问的引用进行后期绑定。使用此功能,你可以从 job 或 job execution context 和 job parameters 中提取 Bean 属性。
下面的示例显示了在 XML 中绑定到 job scope 的示例:
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>
下面的示例显示了在 Java 中绑定到 job scope 的示例:
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
由于默认情况下该 scope 不是 Spring 容器的一部分,因此必须显式添加该 scope,方法是使用 batch
命名空间、显式包含 JobScope
的 Bean 定义或使用 @EnableBatchProcessing
注解(仅选择一种方法)。下面的示例使用了 batch
命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
下面的示例包含一个显式定义 JobScope
的 Bean:
<bean class="org.springframework.batch.core.scope.JobScope" />
在多线程或分区 step 中使用 job scope Bean 有一些实际限制。Spring Batch 无法控制这些用例中产生的线程,因此无法正确设置这些线程以使用此类Bean。因此,我们不建议在多线程或分区 step 中使用 job scope sBean。 |
Scope ItemStream
组件
当使用 Java 配置样式定义 job 或 step scope 的 ItemStream
Bean 时,Bean 定义方法的返回类型至少应为 ItemStream
。这是必需的,这样 Spring Batch 才能正确创建一个实现该接口的代理,从而通过调用 open
、update
和 close
方法来履行其契约。
建议使此类 Bean 的 bean 定义方法返回最具体的已知实现,如下例所示:
@Bean
@StepScope
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.resource(new FileSystemResource(name))
// set other properties of the item reader
.build();
}