常见的批处理模式

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

一些批处理作业可以完全由 Spring Batch 中的现成组件组装而成。例如,ItemReaderItemWriter 实现可配置为涵盖各种场景。不过,在大多数情况下,必须编写自定义代码。应用程序开发人员的主要 API 入口点是 TaskletItemReaderItemWriter 和各种 listener 接口。大多数简单的批处理作业都可以使用 Spring Batch ItemReader 的现成输入,但在处理和编写过程中经常会出现需要自定义的问题,这就需要开发人员实现 ItemWriterItemProcessor

在本章中,我们将举例说明自定义业务逻辑中的常见模式。这些示例主要以 listener 接口为特色。需要注意的是,如果合适,ItemReaderItemWriter 也可以实现 listener 器接口。

记录 Item 处理和失败

一个常见的用例是需要对 step 中的错误逐项进行特殊处理,也许是记录到一个特殊通道,或者是向数据库中插入一条记录。通过面向块的 Step(由 step 工厂 Bean 创建),用户可以使用一个简单的 ItemReadListenerread 时出错的监听器)和一个 ItemWriteListenerwrite 时出错的监听器)来实现这种用例。下面的代码片段展示了一个同时记录 readwrite 失败的监听器:

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

实现了监听器后,必须将其注册到一个 step 中。

下面的示例展示了如何用 XML 向 step 注册监听器。

XML Configuration
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>

下面的示例展示了如何用 Java 注册 step 监听器。

Java Configuration
@Bean
public Step simpleStep(JobRepository jobRepository) {
	return new StepBuilder("simpleStep", jobRepository)
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}
如果你的 listener 在 onError() 方法中做了任何事情,它必须是在一个将要回滚的事务中。如果需要在 onError() 方法中使用事务资源(如数据库),请考虑为该方法添加一个声明式事务(详见《Spring Core 参考指南》),并将其传播属性值设为 REQUIRES_NEW

因业务原因手动停止 Job

Spring Batch 通过 JobOperator 接口提供了一个 stop() 方法,但该方法实际上是运维而非应用开发人员使用的。有时,在业务逻辑中停止 job 执行会更方便或更有意义。

最简单的做法是抛出 RuntimeException(既不会无限重试,也不会跳过)。例如,可以使用自定义异常类型,如下例所示:

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

另一种阻止 step 执行的简单方法是从 ItemReader 返回 null,如下例所示:

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前面的示例实际上依赖于 CompletionPolicy 策略的默认实现,当要处理的 item 为 null 时,该策略会发出批处理完成的信号。可以通过 SimpleStepFactoryBean 实现更复杂的完成策略并注入到 Step 中。

下面的示例展示了如何在 xml 中将完成策略注入 step:

XML Configuration
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

下面的示例展示了如何在 Java 中将完成策略注入 step:

Java Configuration
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("simpleStep", jobRepository)
				.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
				.reader(reader())
				.writer(writer())
				.build();
}

另一种方法是在 StepExecution 中设置一个标志,由框架中的 Step 实现在 item 处理之间检查该标志。要实现这种替代方法,我们需要访问当前的 StepExecution,这可以通过实现一个 StepListener 并将其注册到 Step 中来实现。下面的示例展示了一个设置标志的 listener:

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

设置该标志后,该 step 的默认行为是抛出 JobInterruptedException。可以通过 StepInterruptionPolicy 控制这种行为。不过,唯一的选择是抛出或不抛出异常,因此这始终是 job 的非正常结束。

添加页脚记录

通常,在向平面文件写入数据时,必须在完成所有处理后在文件末尾添加一条 "页脚" 记录。这可以使用 Spring Batch 提供的 FlatFileFooterCallback 接口来实现。FlatFileFooterCallback(及其对应的 FlatFileHeaderCallback)是 FlatFileItemWriter 的可选属性,可添加到 item writer 中。

下面的示例展示了如何在 XML 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

XML Configuration
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

下面的示例展示了如何在 Java 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

Java Configuration
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

页脚回调(footerCallback)接口只有一个方法,当必须写入页脚时会调用该方法,如下面的接口定义所示:

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

编写摘要页脚

涉及页脚记录的一个常见要求是在输出过程中汇总信息,并将这些信息附加到文件末尾。这种页脚通常用作文件摘要或提供校验和。

例如,如果一个批处理任务要将 Trade 记录写入一个平面文件,并要求将所有 Trades 的总金额放在页脚,那么可以使用下面的 ItemWriter 实现:

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(Chunk<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

TradeItemWriter 会存储一个总金额(totalAmount)值,该值会随着写入的每个 Trade item 的金额增加而增加。处理完最后一个 Trade item 后,框架会调用 writeFooter,将 totalAmount 写入文件。请注意,write 方法使用了临时变量 chunkTotal,该变量存储了该数据块中的 Trade 金额总和。这样做是为了确保在 write 方法中出现跳转时,totalAmount 保持不变。只有在 write 方法结束时,确保没有异常抛出后,我们才会更新 totalAmount

为了调用 writeFooter 方法,必须将 TradeItemWriter(实现了 FlatFileFooterCallback)作为 footerCallback 装配到 FlatFileItemWriter 中。

下面的示例展示了如何在 XML 中装配 TradeItemWriter

XML Configuration
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

下面的示例展示了如何在 Java 中装配 TradeItemWriter

Java Configuration
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

目前 TradeItemWriter 的编写方式只有在 Step 不可重启的情况下才能正常运行。这是因为该类是有状态的(因为它存储了 totalAmount),但 totalAmount 并没有持久化到数据库中。因此,在重新启动时无法检索。为使该类可重启,应实现 ItemStream 接口以及 openupdate 方法,如下例所示:

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update 方法会将最新版本的总金额(totalAmount)存储到执行上下文(ExecutionContext)中,然后再将该对象持久化到数据库中。open 方法会从 ExecutionContext 中检索任何现有的 totalAmount,并将其作为处理的起点,这样 TradeItemWriter 就能在重新启动时从上次运行该 Step 时中断的地方继续处理。

基于 ItemReader 的驱动查询

有关 reader 和 writer 的章节 中,讨论了使用分页的数据库输入。许多数据库供应商(如 DB2)加锁策略是悲观锁,如果在线应用程序的其他部分也需要使用正在读取的表,就会出现问题。此外,在超大数据集上打开游标也会给某些供应商的数据库带来问题。因此,许多项目更倾向于使用 “驱动查询”(Driving Query) 方法来读取数据。这种方法通过迭代 key 而不是需要返回的整个对象来实现,如下图所示:

Driving Query Job
Figure 1. Driving Query Job

如图所示,上图中的示例使用了与基于游标的示例中相同的 'FOO' 表。不过,在 SQL 语句中只 select 了 ID,而不是 select 整行。因此,从 read 返回的不是 FOO 对象,而是一个 Integer。如下图所示,这个 Integer 可以用来查询 'details',即一个完整的 Foo 对象:

Driving Query Example
Figure 2. Driving Query Example

应使用 ItemProcessor 将从驱动查询中获得的 key 转换为完整的 Foo 对象。现有的 DAO 可用于根据 key 查询完整对象。

多行记录

在平面文件中,每条记录通常只限于一行,但在一个文件中,记录跨越多行、格式各异的情况也很常见。下面的文件摘录就是这种排列的一个例子:

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

从以 'HEA' 开头的一行到以 'FOT' 开头的一行之间的所有内容都被视为一条记录。要正确处理这种情况,必须考虑一些因素:

  • ItemReader 必须将多行记录的每一行作为一个组(group)来读取,而不是一次读取一条记录,以便将其完整地传递给 ItemWriter

  • 每种行类型可能需要不同的分词方式。

由于一条记录跨越多行,而且我们可能不知道有多少行,因此 ItemReader 必须小心谨慎,始终读取整条记录。为此,应实现一个自定义的 ItemReader,作为 FlatFileItemReader 的 wrapper。

下面的示例展示了如何用 XML 实现自定义 ItemReader

XML Configuration
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

下面的示例展示了如何用 Java 实现自定义 ItemReader

Java Configuration
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

为了确保每行都被正确分词,这对于固定长度的输入尤为重要,可以在委托的 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。有关更多详细信息,请参 Reader 和 Writer 章节中的 FlatFileItemReader。然后,委托 reader 使用 PassThroughFieldSetMapper 将每行的 FieldSet 传递回封装的 ItemReader

下面的示例展示了如何确保在 XML 中正确分词每一行:

XML Content
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

下面的示例展示了如何确保在 Java 中正确分词每一行:

Java Content
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

该 wrapper 必须能够识别记录的结束,这样才能不断调用委托的 read() 直到结束。每读取一行,wrapper 都应建立要返回的 item。一旦到达页脚,该 item 就可以返回,以便交付给 ItemProcessorItemWriter,如下例所示:

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

执行系统命令

许多批处理作业需要在作业中调用外部命令。这样的进程可以由 scheduler 单独启动,但会失去有关运行的通用元数据的优势。此外,多 step job 也需要拆分成多个 job。

由于这种需求非常普遍,Spring Batch 为调用系统命令提供了一个 Tasklet 实现。

下面的示例展示了如何在 XML 中调用外部命令:

XML Configuration
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

下面的示例展示了如何在 Java 中调用外部命令。

Java Configuration
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

未找到输入时处理 Step Completion

在许多批处理场景中,没有在数据库或文件中找到要处理的行不是异常情况。该 Step 被认为没有找到工作并以读取 0 个 item 的方式完成。Spring Batch 中提供的所有 ItemReader 实现都默认采用这种方法。如果即使存在输入也没有写出任何内容(通常是文件命名错误或出现类似问题),这可能会导致一些困惑。因此,应检查元数据本身以确定框架找到要处理的工作量。但是,如果找不到输入被认为是异常情况怎么办?在这种情况下,通过编程方式检查元数据是否存在未处理的 item 并导致失败是最佳解决方案。由于这是一个常见的用例,Spring Batch 提供了一个具有这种功能的 listener,如 NoWorkFoundStepExecutionListener 类定义所示。

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

StepExecutionListener 会在 "afterStep" 阶段检查 StepExecutionreadCount 属性,以确定是否没有读取任何 item。如果是这样,就会返回退出代码 FAILED,表明该 Step 应该失败。否则,将返回 null ,这不会影响 Step 的状态。

传递数据给后续的 Step

将信息从一个 step 传递到另一个 step 通常很有用。这可以通过 ExecutionContext 来实现。问题在于有两个 ExecutionContext:一个在 Step 级,另一个在 Job 级。Step ExecutionContext 只在 step 中保留,而 Job ExecutionContext 则在整个 Job 中保留。另一方面,Step ExecutionContext 会在 Step 每次提交时更新,而 Job ExecutionContext 只在每个 Step 结束时更新。

这种分离的结果是,在 Step 执行过程中,所有数据都必须放在 Step ExecutionContext 中。这样做可以确保数据在 Step 运行时得到正确存储。如果数据被存储到 Job ExecutionContext 中,那么在 Step 执行期间就不会被持久化。如果 Step 失败,数据就会丢失。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

要将数据在将来的 Step 中可用,必须在 step 完成后将其“提升”到 Job ExecutionContext 中。Spring Batch 为此提供了 ExecutionContextPromotionListener。必须使用与要提升的 ExecutionContext 中的数据相关的 key 来配置该 listener。还可以选择性地使用退出代码模式列表进行配置,以确定何时进行提升(默认为 COMPLETED)。与所有 listener 一样,必须将其注册在 Step 上。

下面的示例显示了如何在 XML 中将一个 step 推广到 Job ExecutionContext

XML Configuration
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

下面的示例展示了如何在 Java 中将一个 step 提升到 Job ExecutionContext

Java Configuration
@Bean
public Job job1(JobRepository jobRepository) {
	return new JobBuilder("job1", jobRepository)
				.start(step1())
				.next(step1())
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

最后,必须从 Job ExecutionContext 中检索保存的值,如下例所示:

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}