重复

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

RepeatTemplate

批处理涉及重复操作,可以是简单的优化,也可以是 job 的一部分。为了对重复操作进行策略化和通用化,并提供相当于迭代器的框架,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口的定义如下:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

callback 是一个接口,如下定义所示,可让你插入一些需要重复的业务逻辑:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

callback 被重复执行,直到实现确定迭代应该结束。这些接口的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 枚举向重复操作的调用者传达了是否有剩余工作的信息。一般来说,RepeatOperations 的实现应检查 RepeatStatus,并将其作为结束迭代的决定的一部分。任何 callback 如果希望向调用者发出没有剩余工作的信号,都可以返回 RepeatStatus.FINISHED

RepeatTemplateRepeatOperations 最简单的通用实现:

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,表示还有更多工作要做。回调也可以返回 RepeatStatus.FINISHED,向调用者表明没有剩余工作了。有些迭代可以通过考虑回调中正在进行的工作的内在因素来终止。其他迭代实际上是无限循环(就回调而言),完成与否的决定权委托给外部策略,如上例所示。

RepeatContext

RepeatCallback 的方法参数是 RepeatContext。许多回调都会忽略 context。不过,如果有必要,可以将其用作属性包,在迭代期间存储瞬态数据。iterate 方法返回后,context 将不复存在。

如果正在进行嵌套迭代,RepeatContext 就会有一个 parent context。parent context 有时可用于存储需要在 iterate 调用之间共享的数据。例如,如果你想计算迭代中某个事件的发生次数,并在后续调用中记住它,就属于这种情况。

RepeatStatus

RepeatStatus 是 Spring Batch 用来指示处理是否已完成的枚举。RepeatStatus 有两种可能的值:

Table 1. RepeatStatus 属性

说明

CONTINUABLE

还有工作要处理。

FINISHED

工作处理完毕,不应该再重复了。

你可以使用 RepeatStatus 中的 and() 方法将 RepeatStatus 值与逻辑 AND 运算结合起来。这样做的效果是对可连续标记进行逻辑 AND 运算。换句话说,如果任一状态为 FINISHED,结果就是 FINISHED

完成策略

RepeatTemplate 中,iterate 方法中循环的终止由 CompletionPolicy 决定,而 CompletionPolicy 也是 RepeatContext 的工厂。RepeatTemplate 有责任使用当前策略创建 RepeatContext,并在迭代的每个阶段将其传递给 RepeatCallback。回调完成 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy,要求它更新自己的状态(该状态将存储在 RepeatContext 中)。然后,它会询问策略迭代是否完成。

Spring Batch 提供了一些简单的 CompletionPolicy 通用实现。SimpleCompletionPolicy 允许执行固定次数(RepeatStatus.FINISHED 可随时强制提前完成)。

对于更复杂的决策,用户可能需要执行自己的完成策略。例如,批处理窗口需要自定义策略,以防止线上系统使用时执行批处理作业。

异常处理

如果 RepeatCallback 出现异常,RepeatTemplate 会咨询 ExceptionHandler,由其决定是否重新抛出异常。

下面的列表显示了 ExceptionHandler 接口定义:

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一种常见的用例是计算给定类型的异常数量,并在达到限制时失效。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和略微灵活的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 有一个限制属性和一个异常类型,该异常类型应与当前异常进行比较。所提供类型的所有子类也会被计算在内。给定类型的异常会被忽略,直到达到限制后才会被重新抛出。其他类型的异常总是会被重新抛出。

SimpleLimitExceptionHandler 的一个重要可选属性是名为 useParent 的 boolean 标志。默认情况下,该标志为 false,因此限制只在当前 RepeatContext 中考虑。如果设置为 true,则会在嵌套迭代的同级上下文(如 step 中的一组块)中保留限制。

监听器

通常情况下,在多个不同的迭代中接收额外的回调是非常有用的。为此,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,并在迭代过程中通过 RepeatContextRepeatStatus(如果可用)为它们提供回调。

RepeatListener 接口定义如下

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回调在整个迭代之前和之后进行,而 beforeafteronError 则适用于单个 RepeatCallback 调用。

请注意,当监听器不止一个时,它们是在一个列表中,因此有一个顺序。在这种情况下,openbefore 的调用顺序相同,而 afteronErrorclose 的调用顺序相反。

并行处理

RepeatOperations 的实现并不局限于按顺序执行回调。有些实现能够并行执行回调,这一点非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring TaskExecutor 策略来运行 RepeatCallback。默认情况下使用 SynchronousTaskExecutor,其效果是在同一线程中执行整个迭代(与普通 RepeatTemplate 相同)。

声明式迭代

有时,有些业务处理过程是每次都要重复的。这方面的典型例子就是消息管道的优化。如果一批消息频繁到达,对它们进行处理比为每条消息承担一个单独事务的成本更有效。Spring Batch 为此提供了一种 AOP 拦截器,可将方法调用封装在 RepeatOperations 对象中。RepeatOperationsInterceptor 会执行拦截的方法,并根据提供的 RepeatTemplate 中的 CompletionPolicy 进行重复。

下面的示例展示了声明式迭代,它使用 Spring AOP 命名空间来重复调用名为 processMessage 方法的服务(有关如何配置 AOP 拦截器的更多详情,请参阅 Spring 用户指南):

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

下面的示例使用 Java 配置重复调用名为 processMessage 的方法的服务(有关如何配置 AOP 拦截器的更多详情,请参阅 Spring 用户指南):

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}

前面的示例在拦截器中使用了默认的 RepeatTemplate。要更改策略、监听器和其他细节,可以向拦截器注入 RepeatTemplate 的实例。

如果被拦截的方法返回 void,拦截器总是返回 RepeatStatus.CONTINUABLE(因此如果 CompletionPolicy 没有有限的结束点,可能会导致死循环的危险)。否则,它会一直返回 RepeatStatus.CONTINUABLE,直到被拦截方法的返回值为 null。此时,它会返回 RepeatStatus.FINISHED。因此,目标方法内部的业务逻辑可以通过返回 null 或抛出异常来表示没有更多的工作要做,这个异常会被提供的 RepeatTemplate 中的 ExceptionHandler 重新抛出。