批处理(Batch)的领域语言(Domain Language)

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

对于任何有经验的批处理架构师来说,Spring Batch 中使用的批处理的整体概念应该是熟悉和舒适的。有 “Job” 和 “Step” 以及开发者提供的处理单元,称为 ItemReaderItemWriter。然而,由于Spring 的模式、操作、template、callback 和习语,可能会会出现以下情况:

  • 在坚持明确区分关注点方面有明显的改善。

  • 明确划分的架构层和作为接口提供的服务。

  • 简单和默认的实现方式,允许快速采用和易于使用的开箱即用。

  • 大大增强了可扩展性。

下图是已经使用了几十年的批处理参考架构的简化版本。它提供了一个构成批处理领域语言的组件的概述。这个架构框架是一个蓝图,通过过去几代平台(大型机上的COBOL、Unix上的C,以及现在任何地方的Java)上几十年的实施,已经得到了验证。JCL和COBOL开发者可能会像C、C#和Java开发者一样对这些概念感到舒服。Spring Batch提供了在强大的、可维护的系统中常见的层、组件和技术服务的物理实现,这些系统被用来解决创建简单到复杂的批处理应用程序,其基础设施和扩展可以解决非常复杂的处理需求。

Figure 2.1: Batch Stereotypes
Figure 1. Batch Stereotypes

前面的图强调了构成 Spring Batch 领域语言的关键概念。一个 Job 有一至多个 step,每个 step 正好有一个 ItemReader,一个 ItemProcessor,和一个 ItemWriter。一个 Job 需要被启动(用 JobLauncher),关于当前运行进程的元数据需要被存储(在 JobRepository)。

Job

本节描述了与批处理 job 的概念有关的 stereotype。一个 Job 是一个实体,它封装了整个批处理过程。与其他Spring项目一样,一个 Job 与 XML 配置文件或基于Java的配置连接在一起。这种配置可以被称为 "job 配置"。然而,Job 只是整个层次结构的顶端,如下图所示:

Job Hierarchy
Figure 2. Job 层次

在 Spring Batch 中,Job 只是一个 Step 实例的容器。它将逻辑上属于一个流程的多个 step 结合在一起,并允许配置所有 step 的全局属性,如重新启动的能力。job 配置包含:

  • job 的名称。

  • Step 实例的定义和排序。

  • job 是否可以重新启动。

For those who use Java configuration, Spring Batch provides a default implementation of the Job interface in the form of the SimpleJob class, which creates some standard functionality on top of Job. When using Java-based configuration, a collection of builders is made available for the instantiation of a Job, as the following example shows:

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

For those who use XML configuration, Spring Batch provides a default implementation of the Job interface in the form of the SimpleJob class, which creates some standard functionality on top of Job. However, the batch namespace abstracts away the need to instantiate it directly. Instead, you can use the <job> element, as the following example shows:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

JobInstance

JobInstance 指的是一个逻辑job运行的概念。考虑一个应该在一天结束时运行一次的批处理 job,如上图中的 EndOfDay Job。有一个 EndOfDay job,但该 Job 的每个单独运行都必须被单独跟踪。在这个 job 的情况下,每天有一个逻辑 JobInstance。例如,有一个1月1日的运行,一个1月2日的运行,以此类推。如果1月1日的运行第一次失败,第二天再次运行,它仍然是1月1日的运行。(通常,这与它所处理的数据也是对应的,也就是说,1月1日的运行处理的是1月1日的数据)。因此,每个 JobInstance 可以有多个执行(JobExecution 将在本章后面详细讨论),并且在一个给定的时间只能运行一个 JobInstance(它对应于一个特定的 JobJobParameters 标识)。

JobInstance 的定义与要加载的数据完全没有关系。完全由 ItemReader 的实现来决定数据的加载方式。例如,在 EndOfDay 方案中,数据上可能有一列表明数据所属的 effective dateschedule date。因此,1月1日的运行将只加载1日的数据,而1月2日的运行将只使用2日的数据。因为这个决定很可能是一个业务决定,所以它由 ItemReader 来决定。然而,使用相同的 JobInstance 决定了是否使用以前执行的 “state”(即 ExecutionContext,本章后面将讨论)。使用一个新的 JobInstance 意味着 "从头开始",而使用一个现有的实例通常意味着 "从你停止的地方开始"。

JobParameters

在讨论了 JobInstance 以及它与 Job 的不同之处之后,自然要问的问题是: "一个 JobInstance 与另一个 JobInstance 是如何区分的?" 答案是: JobParameters。一个 JobParameters 对象持有一组用于批处理job的参数。它们可以用于识别,甚至可以作为运行期间的参考数据,如下图所示:

Job Parameters
Figure 3. Job Parameters

在前面的例子中,有两个实例,一个是1月1日,另一个是1月2日,实际上只有一个 Job,但它有两个 JobParameter 对象:一个是以 01-01-2017 的工作参数启动的,另一个是以 01-02-2017 的参数启动的。因此,可以被定义为 JobInstance = Job + 指定的 JobParameters。这使得开发者可以有效地控制 JobInstance 的定义方式,因为他们可以控制哪些参数被传入。

并非所有的 job parameters 都必须有助于识别一个 JobInstance。在默认情况下,它们会这样做。然而,该框架也允许提交带有不有助于识别 JobInstance 的参数的 Job

JobExecution

一个 JobExecution 指的是运行一个 Job 的单一尝试的技术概念。一个 execution 可能以失败或成功告终,但对应于一个特定执行的 JobInstance 并不被认为是完整的,除非 execution 成功完成。以前面描述的 EndOfDay Job 为例,考虑一个 01-01-2017 的 JobInstance 在第一次运行时失败。如果它以与第一次运行(01-01-2017)相同的 identifying job parameters 再次运行,就会创建一个新的 JobExecution。然而,仍然只有一个 JobInstance

一个 Job 定义了什么是 job 以及如何执行工作,而 JobInstance 是一个纯粹的组织对象,将执行工作分组,主要是为了实现正确的重启语义。然而,JobExecution 是运行过程中实际发生的事情的主要存储机制,它包含了许多必须被控制和持久化的属性,如下表所示:

Table 1. JobExecution 属性

属性

说明

Status

一个指示执行状态的 BatchStatus 对象。在运行时,它是 BatchStatus#STARTED。如果它失败了,它是 BatchStatus#FAILED。如果它成功完成,它是 BatchStatus#COMPLETED

startTime

一个 java.time.LocalDateTime,代表执行开始时的当前系统时间。如果 job 还没有开始,这个字段是空的。

endTime

一个 java.time.LocalDateTime,代表执行结束时的当前系统时间,不管它是否成功。如果 job 还没有完成,该字段为空。

exitStatus

ExitStatus,表示运行的结果。它是最重要的,因为它包含一个返回给调用者的退出代码(exit code)。更多细节见第5章。如果 job 还没有完成,该字段为空。

createTime

一个 java.time.LocalDateTime,代表 JobExecution 第一次被持久化时的当前系统时间。job 可能还没有开始(因此没有开始时间),但它总是有一个 createTime,这是框架管理 job 级 ExecutionContexts 所需要的。

lastUpdated

一个 java.time.LocalDateTime,代表 JobExecution 最后被持久化的时间。如果 job 还没有开始,这个字段是空的。

executionContext

包含任何需要在 execution 之间持续存在的用户数据的 "属性包"。

failureExceptions

Job 执行过程中遇到的异常情况的列表。如果在 Job 失败过程中遇到一个以上的异常,这些就很有用。

这些属性很重要,因为它们被持久化了,可以用来完全确定一个执行的状态。例如,如果01-01的 EndOfDay job 在晚上9:00执行,在9:30失败,那么在批处理元数据表中会有以下条目:

Table 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

Table 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

Table 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

为清晰和格式化起见,列名可能被缩写或省略。

现在 job 已经失败了,假设整个晚上才确定问题,所以 "批处理窗口" 现在已经关闭。进一步假设该窗口在晚上9:00开始,Job 再次启动01-01,从它离开的地方开始,在9:30成功完成。因为现在是第二天,所以01-02的工作也必须运行,它紧接着在9:31被启动,并在10:30的正常一小时时间内完成。没有要求一个 JobInstance 在另一个之后被启动,除非这两个 Job 有可能试图访问相同的数据,导致数据库级别的锁问题。完全由调度器来决定何时应该运行一个 Job。由于它们是独立的 JobInstances,Spring Batch不会试图阻止它们同时运行。(当另一个 JobInstance 已经在运行时,试图运行同一个 JobInstance 会导致抛出 JobExecutionAlreadyRunningException)。现在在 JobInstanceJobParameters 表中都应该有一个额外的条目,在 JobExecution 表中也有两个额外的条目,如下表所示:

Table 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

Table 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

Table 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

COMPLETED

3

2

2017-01-02 21:31

2017-01-02 22:29

COMPLETED

为清晰和格式化起见,列名可能被缩写或省略。

Step

Step 是一个 domain 对象,它封装了批处理 Job 的一个独立、连续的阶段。因此,每个 Job 完全由一个或多个步骤(step)组成。一个 Step 包含定义和控制实际批处理的所有必要信息。这必然是一个模糊的描述,因为任何给定的 Step 的内容都由编写 Job 的开发人员决定。一个 Step 可以是简单的,也可以是复杂的,正如开发者所希望的那样。一个简单的 Step 可能从文件中加载数据到数据库中,只需要很少或没有代码(取决于使用的实现)。一个更复杂的 Step 可能有复杂的业务规则,作为处理的一部分被应用。与 Job 一样,一个 Step 有一个单独的 StepExecution,与一个独特的 JobExecution 相关,如下图所示:

Figure 2.1: Job Hierarchy With Steps
Figure 4. Job 层次与 Step

StepExecution

一个 StepExecution 表示执行一个 Step 的单一尝试。每次运行一个 Step 都会创建一个新的 StepExecution,与 JobExecution 类似。然而,如果一个 Step 因为之前的 Step 失败而无法执行,则不会为其持续执行。一个 StepExecution 只有在其 Step 实际启动时才被创建。

Step 的执行由 StepExecution 类的对象表示。每个执行都包含对其相应 step 和 JobExecution 的引用,以及与事务相关的数据,如提交和回滚计数以及开始和结束时间。此外,每个 step 的执行都包含一个 ExecutionContext,它包含了开发者需要跨批处理运行的任何数据,例如重新启动所需的统计数据或状态信息。下表列出了 StepExecution 的属性:

Table 8. StepExecution 属性

属性

定义

Status

一个指示执行状态的 BatchStatus 对象。在运行时,状态为 BatchStatus.STARTED。如果失败,状态为 BatchStatus.FAILED。如果它成功完成,状态是 BatchStatus.COMPLETED

startTime

一个 java.time.LocalDateTime,代表执行开始时的当前系统时间。如果该 step 尚未开始,该字段为空。

endTime

一个 java.time.LocalDateTime,代表执行结束时的当前系统时间,无论是否成功。如果该 step 还没有退出,该字段为空。

exitStatus

ExitStatus 表示执行的结果。它是最重要的,因为它包含一个返回给调用者的退出代码(exit code)。更多细节见第5章。如果 job 还没有退出,这个字段是空的。

executionContext

包含任何需要在 execution 之间持续存在的用户数据的 "属性包"。

readCount

已经成功读取的 item 数量。

writeCount

已成功写入的 item 数量。

commitCount

本次 execution 中已提交的事务数量。

rollbackCount

由该 Step 控制的业务事务被回滚的次数。

readSkipCount

read 失败的次数,导致跳过 item。

processSkipCount

process 失败的次数,导致跳过 item。

filterCount

ItemProcessor "过滤" 过的项目的数量。

writeSkipCount

write 失败的次数,导致跳过 item。

ExecutionContext

一个 ExecutionContext 代表了一个键/值对的集合,这些键/值对被持久化并由框架控制,为开发者提供了一个存储持久化状态的地方,这些持久化状态的 scope 是 StepExecution 对象或 JobExecution 对象。(对于那些熟悉Quartz的人来说,它与 JobDataMap 非常相似。)最好的使用例子是方便重新启动。以平面文件输入为例,在处理个别行时,框架会定期在提交点持久化 ExecutionContext。这样做可以让 ItemReader 存储其状态,以防在运行过程中发生致命错误,甚至停电。正如下面的例子所示,只需要将当前读取的行数放入上下文中,剩下的就由框架来完成:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

Job stereotype 部分的 EndOfDay 例子为例,假设有一个 step,loadData,将一个文件加载到数据库。在第一次运行失败后,元数据表将看起来像下面的例子:

Table 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

Table 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

Table 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

Table 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

Table 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在前面的案例中,该 Step 运行了30分钟,处理了40,321个 "件",在这种情况下,这代表了文件中的行。这个值在每次提交前由框架更新,可以包含与 ExecutionContext 中的条目对应的多行。在提交前得到通知需要各种 StepListener 实现之一(或一个 ItemStream),本指南后面将详细讨论这些。与前面的例子一样,我们假设 Job 在第二天重新启动。当它被重新启动时,上次运行的 ExecutionContext 中的值将从数据库中重新构建。当 ItemReader 被打开时,它可以检查它在上下文中是否有任何存储的状态,并从那里初始化自己,正如下面的例子所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,在前面的代码运行后,当前行是40,322,让 Step 从它离开的地方重新开始。你也可以将 ExecutionContext 用于需要持久化的关于运行本身的统计数据。例如,如果一个平面文件包含存在于多行的处理订单,可能需要存储有多少订单被处理了(这与读取的行数有很大不同),这样就可以在 Step 结束时发送一封电子邮件,在正文中写明处理的订单总数。该框架为开发者处理存储这个问题,以正确地将其与单个 JobInstance 联系起来。要知道是否应该使用现有的 ExecutionContext 是非常困难的。例如,使用上面的 EndOfDay 例子,当01-01运行第二次启动时,框架认识到这是同一个 JobInstance,并在单个 Step 的基础上,将 ExecutionContext 从数据库中取出,并将其(作为 StepExecution 的一部分)交给 Step 本身。相反,对于01-02的运行,框架认识到这是一个不同的实例,所以必须把一个空的上下文交给 Step。框架为开发者做了许多这种类型的判断,以确保在正确的时间将状态交给他们。同样重要的是要注意,在任何时候,每个 StepExecution 只存在一个 ExecutionContextExecutionContext 的客户端应该小心,因为这创造了一个共享的 keyspace。因此,在把值放进去时应该小心,以确保没有数据被覆盖。然而,Step 在上下文中绝对不存储任何数据,所以没有办法对框架产生不利影响。

注意,每个 JobExecution 至少有一个 ExecutionContext,每个 StepExecution 也有一个。例如,考虑下面的代码片段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

正如注释中指出的,ecStep 不等于 ecJob。它们是两个不同的 ExecutionContexts。适用于 Step 的是在 Step 的每个提交点保存的,而适用于 Job 的是在每个 Step 执行之间保存的。

JobRepository

JobRepository 是前面提到的所有 stereotype 的持久化机制。它为 JobLauncherJobStep 的实现提供 CRUD 操作。当一个 Job 第一次被启动时,一个 JobExecution 会从 repository 中获得。另外,在执行过程中,StepExecutionJobExecution 的实现通过传递给 repository 而被保存。

Spring Batch XML 命名空间提供了对用 <job-repository> 标签配置 JobRepository 实例的支持,如下例所示:

<job-repository id="jobRepository"/>

当使用 Java 配置时,@EnableBatchProcessing 注解提供了一个 JobRepository 作为自动配置的组件之一。

JobLauncher

JobLauncher 代表了一个简单的接口,用于用一组给定的 JobParameters 启动一个 Job,如下面的例子所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

我们期望实现从 JobRepository 获取有效的 JobExecution 并执行该 Job

ItemReader

ItemReader 是一个抽象概念,它代表了对一个 Step 的输入的检索,一次一个 item。当 ItemReader 用完了它能提供的 item 时,它会通过返回 null 来表示这一点。你可以在Reader 和 Writer 中找到更多关于 ItemReader 接口及其各种实现的细节。

ItemWriter

ItemWriter 是一个抽象概念,它代表了一个 Step 的输出,每次都是一批或一大批的item。一般来说,ItemWriter 不知道它接下来应该收到的输入,只知道在其当前调用中传递的 item。你可以在 Reader 和 Writer 中找到更多关于 ItemWriter 接口及其各种实现的细节。

ItemProcessor

ItemProcessor 是一个抽象,它代表一个 item 的业务处理。当 ItemReader 读取一个item,而 ItemWriter 写入一个 item 时,ItemProcessor 提供了一个访问点来转换或应用其他业务处理。如果在处理该 item 时,确定该项目是无效的,返回 null 表示该 item 不应该被写出。你可以在 Reader 和 Writer 中找到更多关于 ItemProcessor 接口的细节。

Batch 命名空间

前面列出的许多 domain 概念都需要在Spring ApplicationContext 中进行配置。虽然有上述接口的实现,你可以在标准的 Bean 定义中使用,但为了便于配置,我们提供了一个命名空间,如下例所示:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要批处理命名空间已被声明,它的任何元素都可以被使用。你可以 在配置和运行一个 Job 中找到更多关于配置一个 Job 的信息。你可以在 配置 Step 中找到关于配置 Step 的更多信息。