批处理(Batch)的领域语言(Domain Language)
本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。 |
对于任何有经验的批处理架构师来说,Spring Batch 中使用的批处理的整体概念应该是熟悉和舒适的。有 “Job” 和 “Step” 以及开发者提供的处理单元,称为 ItemReader
和 ItemWriter
。然而,由于Spring 的模式、操作、template、callback 和习语,可能会会出现以下情况:
-
在坚持明确区分关注点方面有明显的改善。
-
明确划分的架构层和作为接口提供的服务。
-
简单和默认的实现方式,允许快速采用和易于使用的开箱即用。
-
大大增强了可扩展性。
下图是已经使用了几十年的批处理参考架构的简化版本。它提供了一个构成批处理领域语言的组件的概述。这个架构框架是一个蓝图,通过过去几代平台(大型机上的COBOL、Unix上的C,以及现在任何地方的Java)上几十年的实施,已经得到了验证。JCL和COBOL开发者可能会像C、C#和Java开发者一样对这些概念感到舒服。Spring Batch提供了在强大的、可维护的系统中常见的层、组件和技术服务的物理实现,这些系统被用来解决创建简单到复杂的批处理应用程序,其基础设施和扩展可以解决非常复杂的处理需求。

前面的图强调了构成 Spring Batch 领域语言的关键概念。一个 Job
有一至多个 step,每个 step 正好有一个 ItemReader
,一个 ItemProcessor
,和一个 ItemWriter
。一个 Job
需要被启动(用 JobLauncher
),关于当前运行进程的元数据需要被存储(在 JobRepository
)。
Job
本节描述了与批处理 job 的概念有关的 stereotype。一个 Job
是一个实体,它封装了整个批处理过程。与其他Spring项目一样,一个 Job
与 XML 配置文件或基于Java的配置连接在一起。这种配置可以被称为 "job 配置"。然而,Job
只是整个层次结构的顶端,如下图所示:

在 Spring Batch 中,Job
只是一个 Step
实例的容器。它将逻辑上属于一个流程的多个 step 结合在一起,并允许配置所有 step 的全局属性,如重新启动的能力。job 配置包含:
-
job 的名称。
-
Step
实例的定义和排序。 -
job 是否可以重新启动。
For those who use Java configuration, Spring Batch provides a default implementation of
the Job
interface in the form of the SimpleJob
class, which creates some standard
functionality on top of Job
. When using Java-based configuration, a collection of
builders is made available for the instantiation of a Job
, as the following
example shows:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
For those who use XML configuration, Spring Batch provides a default implementation of the
Job
interface in the form of the SimpleJob
class, which creates some standard
functionality on top of Job
. However, the batch namespace abstracts away the need to
instantiate it directly. Instead, you can use the <job>
element, as the
following example shows:
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance
JobInstance
指的是一个逻辑job运行的概念。考虑一个应该在一天结束时运行一次的批处理 job,如上图中的 EndOfDay
Job
。有一个 EndOfDay
job,但该 Job
的每个单独运行都必须被单独跟踪。在这个 job 的情况下,每天有一个逻辑 JobInstance
。例如,有一个1月1日的运行,一个1月2日的运行,以此类推。如果1月1日的运行第一次失败,第二天再次运行,它仍然是1月1日的运行。(通常,这与它所处理的数据也是对应的,也就是说,1月1日的运行处理的是1月1日的数据)。因此,每个 JobInstance
可以有多个执行(JobExecution
将在本章后面详细讨论),并且在一个给定的时间只能运行一个 JobInstance
(它对应于一个特定的 Job
和 JobParameters
标识)。
JobInstance
的定义与要加载的数据完全没有关系。完全由 ItemReader
的实现来决定数据的加载方式。例如,在 EndOfDay
方案中,数据上可能有一列表明数据所属的 effective date
或 schedule date
。因此,1月1日的运行将只加载1日的数据,而1月2日的运行将只使用2日的数据。因为这个决定很可能是一个业务决定,所以它由 ItemReader
来决定。然而,使用相同的 JobInstance
决定了是否使用以前执行的 “state”(即 ExecutionContext
,本章后面将讨论)。使用一个新的 JobInstance
意味着 "从头开始",而使用一个现有的实例通常意味着 "从你停止的地方开始"。
JobParameters
在讨论了 JobInstance
以及它与 Job
的不同之处之后,自然要问的问题是: "一个 JobInstance
与另一个 JobInstance
是如何区分的?" 答案是: JobParameters
。一个 JobParameters
对象持有一组用于批处理job的参数。它们可以用于识别,甚至可以作为运行期间的参考数据,如下图所示:

在前面的例子中,有两个实例,一个是1月1日,另一个是1月2日,实际上只有一个 Job
,但它有两个 JobParameter
对象:一个是以 01-01-2017 的工作参数启动的,另一个是以 01-02-2017 的参数启动的。因此,可以被定义为 JobInstance
= Job
+ 指定的 JobParameters
。这使得开发者可以有效地控制 JobInstance
的定义方式,因为他们可以控制哪些参数被传入。
并非所有的 job parameters 都必须有助于识别一个 JobInstance 。在默认情况下,它们会这样做。然而,该框架也允许提交带有不有助于识别 JobInstance 的参数的 Job 。
|
JobExecution
一个 JobExecution
指的是运行一个 Job 的单一尝试的技术概念。一个 execution 可能以失败或成功告终,但对应于一个特定执行的 JobInstance
并不被认为是完整的,除非 execution 成功完成。以前面描述的 EndOfDay
Job
为例,考虑一个 01-01-2017 的 JobInstance
在第一次运行时失败。如果它以与第一次运行(01-01-2017)相同的 identifying job parameters 再次运行,就会创建一个新的 JobExecution
。然而,仍然只有一个 JobInstance
。
一个 Job
定义了什么是 job 以及如何执行工作,而 JobInstance
是一个纯粹的组织对象,将执行工作分组,主要是为了实现正确的重启语义。然而,JobExecution
是运行过程中实际发生的事情的主要存储机制,它包含了许多必须被控制和持久化的属性,如下表所示:
属性 |
说明 |
|
一个指示执行状态的 |
|
一个 |
|
一个 |
|
|
|
一个 |
|
一个 |
|
包含任何需要在 execution 之间持续存在的用户数据的 "属性包"。 |
|
在 |
这些属性很重要,因为它们被持久化了,可以用来完全确定一个执行的状态。例如,如果01-01的 EndOfDay
job 在晚上9:00执行,在9:30失败,那么在批处理元数据表中会有以下条目:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
为清晰和格式化起见,列名可能被缩写或省略。 |
现在 job 已经失败了,假设整个晚上才确定问题,所以 "批处理窗口" 现在已经关闭。进一步假设该窗口在晚上9:00开始,Job
再次启动01-01,从它离开的地方开始,在9:30成功完成。因为现在是第二天,所以01-02的工作也必须运行,它紧接着在9:31被启动,并在10:30的正常一小时时间内完成。没有要求一个 JobInstance
在另一个之后被启动,除非这两个 Job
有可能试图访问相同的数据,导致数据库级别的锁问题。完全由调度器来决定何时应该运行一个 Job
。由于它们是独立的 JobInstances
,Spring Batch不会试图阻止它们同时运行。(当另一个 JobInstance
已经在运行时,试图运行同一个 JobInstance
会导致抛出 JobExecutionAlreadyRunningException
)。现在在 JobInstance
和 JobParameters
表中都应该有一个额外的条目,在 JobExecution
表中也有两个额外的条目,如下表所示:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
COMPLETED |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
COMPLETED |
为清晰和格式化起见,列名可能被缩写或省略。 |
Step
Step
是一个 domain 对象,它封装了批处理 Job
的一个独立、连续的阶段。因此,每个 Job
完全由一个或多个步骤(step)组成。一个 Step
包含定义和控制实际批处理的所有必要信息。这必然是一个模糊的描述,因为任何给定的 Step
的内容都由编写 Job
的开发人员决定。一个 Step
可以是简单的,也可以是复杂的,正如开发者所希望的那样。一个简单的 Step
可能从文件中加载数据到数据库中,只需要很少或没有代码(取决于使用的实现)。一个更复杂的 Step
可能有复杂的业务规则,作为处理的一部分被应用。与 Job
一样,一个 Step
有一个单独的 StepExecution
,与一个独特的 JobExecution
相关,如下图所示:

StepExecution
一个 StepExecution
表示执行一个 Step
的单一尝试。每次运行一个 Step
都会创建一个新的 StepExecution
,与 JobExecution
类似。然而,如果一个 Step
因为之前的 Step
失败而无法执行,则不会为其持续执行。一个 StepExecution
只有在其 Step
实际启动时才被创建。
Step
的执行由 StepExecution
类的对象表示。每个执行都包含对其相应 step 和 JobExecution
的引用,以及与事务相关的数据,如提交和回滚计数以及开始和结束时间。此外,每个 step 的执行都包含一个 ExecutionContext
,它包含了开发者需要跨批处理运行的任何数据,例如重新启动所需的统计数据或状态信息。下表列出了 StepExecution
的属性:
属性 |
定义 |
|
一个指示执行状态的 |
|
一个 |
|
一个 |
|
|
|
包含任何需要在 execution 之间持续存在的用户数据的 "属性包"。 |
|
已经成功读取的 item 数量。 |
|
已成功写入的 item 数量。 |
|
本次 execution 中已提交的事务数量。 |
|
由该 |
|
|
|
|
|
被 |
|
|
ExecutionContext
一个 ExecutionContext
代表了一个键/值对的集合,这些键/值对被持久化并由框架控制,为开发者提供了一个存储持久化状态的地方,这些持久化状态的 scope 是 StepExecution
对象或 JobExecution
对象。(对于那些熟悉Quartz的人来说,它与 JobDataMap
非常相似。)最好的使用例子是方便重新启动。以平面文件输入为例,在处理个别行时,框架会定期在提交点持久化 ExecutionContext
。这样做可以让 ItemReader
存储其状态,以防在运行过程中发生致命错误,甚至停电。正如下面的例子所示,只需要将当前读取的行数放入上下文中,剩下的就由框架来完成:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以 Job
stereotype 部分的 EndOfDay
例子为例,假设有一个 step,loadData
,将一个文件加载到数据库。在第一次运行失败后,元数据表将看起来像下面的例子:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在前面的案例中,该 Step
运行了30分钟,处理了40,321个 "件",在这种情况下,这代表了文件中的行。这个值在每次提交前由框架更新,可以包含与 ExecutionContext
中的条目对应的多行。在提交前得到通知需要各种 StepListener
实现之一(或一个 ItemStream
),本指南后面将详细讨论这些。与前面的例子一样,我们假设 Job
在第二天重新启动。当它被重新启动时,上次运行的 ExecutionContext
中的值将从数据库中重新构建。当 ItemReader
被打开时,它可以检查它在上下文中是否有任何存储的状态,并从那里初始化自己,正如下面的例子所示:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在前面的代码运行后,当前行是40,322,让 Step
从它离开的地方重新开始。你也可以将 ExecutionContext
用于需要持久化的关于运行本身的统计数据。例如,如果一个平面文件包含存在于多行的处理订单,可能需要存储有多少订单被处理了(这与读取的行数有很大不同),这样就可以在 Step
结束时发送一封电子邮件,在正文中写明处理的订单总数。该框架为开发者处理存储这个问题,以正确地将其与单个 JobInstance
联系起来。要知道是否应该使用现有的 ExecutionContext
是非常困难的。例如,使用上面的 EndOfDay
例子,当01-01运行第二次启动时,框架认识到这是同一个 JobInstance
,并在单个 Step
的基础上,将 ExecutionContext
从数据库中取出,并将其(作为 StepExecution
的一部分)交给 Step
本身。相反,对于01-02的运行,框架认识到这是一个不同的实例,所以必须把一个空的上下文交给 Step
。框架为开发者做了许多这种类型的判断,以确保在正确的时间将状态交给他们。同样重要的是要注意,在任何时候,每个 StepExecution
只存在一个 ExecutionContext
。 ExecutionContext
的客户端应该小心,因为这创造了一个共享的 keyspace。因此,在把值放进去时应该小心,以确保没有数据被覆盖。然而,Step
在上下文中绝对不存储任何数据,所以没有办法对框架产生不利影响。
注意,每个 JobExecution
至少有一个 ExecutionContext
,每个 StepExecution
也有一个。例如,考虑下面的代码片段:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
正如注释中指出的,ecStep
不等于 ecJob
。它们是两个不同的 ExecutionContexts
。适用于 Step
的是在 Step
的每个提交点保存的,而适用于 Job
的是在每个 Step
执行之间保存的。
JobRepository
JobRepository
是前面提到的所有 stereotype 的持久化机制。它为 JobLauncher
、Job
和 Step
的实现提供 CRUD 操作。当一个 Job
第一次被启动时,一个 JobExecution
会从 repository 中获得。另外,在执行过程中,StepExecution
和 JobExecution
的实现通过传递给 repository 而被保存。
Spring Batch XML 命名空间提供了对用 <job-repository>
标签配置 JobRepository
实例的支持,如下例所示:
<job-repository id="jobRepository"/>
当使用 Java 配置时,@EnableBatchProcessing
注解提供了一个 JobRepository
作为自动配置的组件之一。
JobLauncher
JobLauncher
代表了一个简单的接口,用于用一组给定的 JobParameters
启动一个 Job
,如下面的例子所示:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
我们期望实现从 JobRepository
获取有效的 JobExecution
并执行该 Job
。
ItemReader
ItemReader
是一个抽象概念,它代表了对一个 Step
的输入的检索,一次一个 item。当 ItemReader
用完了它能提供的 item 时,它会通过返回 null
来表示这一点。你可以在Reader 和 Writer 中找到更多关于 ItemReader
接口及其各种实现的细节。
ItemWriter
ItemWriter
是一个抽象概念,它代表了一个 Step
的输出,每次都是一批或一大批的item。一般来说,ItemWriter
不知道它接下来应该收到的输入,只知道在其当前调用中传递的 item。你可以在 Reader 和 Writer 中找到更多关于 ItemWriter
接口及其各种实现的细节。
ItemProcessor
ItemProcessor
是一个抽象,它代表一个 item 的业务处理。当 ItemReader
读取一个item,而 ItemWriter
写入一个 item 时,ItemProcessor
提供了一个访问点来转换或应用其他业务处理。如果在处理该 item 时,确定该项目是无效的,返回 null
表示该 item 不应该被写出。你可以在 Reader 和 Writer 中找到更多关于 ItemProcessor
接口的细节。
Batch 命名空间
前面列出的许多 domain 概念都需要在Spring ApplicationContext
中进行配置。虽然有上述接口的实现,你可以在标准的 Bean 定义中使用,但为了便于配置,我们提供了一个命名空间,如下例所示:
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>
只要批处理命名空间已被声明,它的任何元素都可以被使用。你可以 在配置和运行一个 Job 中找到更多关于配置一个 Job 的信息。你可以在 配置 Step 中找到关于配置 Step
的更多信息。