重复
本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。 |
RepeatTemplate
批处理涉及重复操作,可以是简单的优化,也可以是 job 的一部分。为了对重复操作进行策略化和通用化,并提供相当于迭代器的框架,Spring Batch 提供了 RepeatOperations
接口。RepeatOperations
接口的定义如下:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
callback 是一个接口,如下定义所示,可让你插入一些需要重复的业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
callback 被重复执行,直到实现确定迭代应该结束。这些接口的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLE
或 RepeatStatus.FINISHED
。RepeatStatus
枚举向重复操作的调用者传达了是否有剩余工作的信息。一般来说,RepeatOperations
的实现应检查 RepeatStatus
,并将其作为结束迭代的决定的一部分。任何 callback 如果希望向调用者发出没有剩余工作的信号,都可以返回 RepeatStatus.FINISHED
。
RepeatTemplate
是 RepeatOperations
最简单的通用实现:
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的示例中,我们返回 RepeatStatus.CONTINUABLE
,表示还有更多工作要做。回调也可以返回 RepeatStatus.FINISHED
,向调用者表明没有剩余工作了。有些迭代可以通过考虑回调中正在进行的工作的内在因素来终止。其他迭代实际上是无限循环(就回调而言),完成与否的决定权委托给外部策略,如上例所示。
完成策略
在 RepeatTemplate
中,iterate
方法中循环的终止由 CompletionPolicy
决定,而 CompletionPolicy
也是 RepeatContext
的工厂。RepeatTemplate
有责任使用当前策略创建 RepeatContext
,并在迭代的每个阶段将其传递给 RepeatCallback
。回调完成 doInIteration
后,RepeatTemplate
必须调用 CompletionPolicy
,要求它更新自己的状态(该状态将存储在 RepeatContext
中)。然后,它会询问策略迭代是否完成。
Spring Batch 提供了一些简单的 CompletionPolicy
通用实现。SimpleCompletionPolicy
允许执行固定次数(RepeatStatus.FINISHED
可随时强制提前完成)。
对于更复杂的决策,用户可能需要执行自己的完成策略。例如,批处理窗口需要自定义策略,以防止线上系统使用时执行批处理作业。
异常处理
如果 RepeatCallback
出现异常,RepeatTemplate
会咨询 ExceptionHandler
,由其决定是否重新抛出异常。
下面的列表显示了 ExceptionHandler
接口定义:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一种常见的用例是计算给定类型的异常数量,并在达到限制时失效。为此,Spring Batch 提供了 SimpleLimitExceptionHandler
和略微灵活的 RethrowOnThresholdExceptionHandler
。SimpleLimitExceptionHandler
有一个限制属性和一个异常类型,该异常类型应与当前异常进行比较。所提供类型的所有子类也会被计算在内。给定类型的异常会被忽略,直到达到限制后才会被重新抛出。其他类型的异常总是会被重新抛出。
SimpleLimitExceptionHandler
的一个重要可选属性是名为 useParent
的 boolean 标志。默认情况下,该标志为 false
,因此限制只在当前 RepeatContext
中考虑。如果设置为 true
,则会在嵌套迭代的同级上下文(如 step 中的一组块)中保留限制。
监听器
通常情况下,在多个不同的迭代中接收额外的回调是非常有用的。为此,Spring Batch 提供了 RepeatListener
接口。RepeatTemplate
允许用户注册 RepeatListener
实现,并在迭代过程中通过 RepeatContext
和 RepeatStatus
(如果可用)为它们提供回调。
RepeatListener
接口定义如下
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open
和 close
回调在整个迭代之前和之后进行,而 before
、after
和 onError
则适用于单个 RepeatCallback
调用。
请注意,当监听器不止一个时,它们是在一个列表中,因此有一个顺序。在这种情况下,open
和 before
的调用顺序相同,而 after
、onError
和 close
的调用顺序相反。
并行处理
RepeatOperations
的实现并不局限于按顺序执行回调。有些实现能够并行执行回调,这一点非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate
,它使用 Spring TaskExecutor
策略来运行 RepeatCallback
。默认情况下使用 SynchronousTaskExecutor
,其效果是在同一线程中执行整个迭代(与普通 RepeatTemplate
相同)。
声明式迭代
有时,有些业务处理过程是每次都要重复的。这方面的典型例子就是消息管道的优化。如果一批消息频繁到达,对它们进行处理比为每条消息承担一个单独事务的成本更有效。Spring Batch 为此提供了一种 AOP 拦截器,可将方法调用封装在 RepeatOperations
对象中。RepeatOperationsInterceptor
会执行拦截的方法,并根据提供的 RepeatTemplate
中的 CompletionPolicy
进行重复。
下面的示例展示了声明式迭代,它使用 Spring AOP 命名空间来重复调用名为 processMessage
方法的服务(有关如何配置 AOP 拦截器的更多详情,请参阅 Spring 用户指南):
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
下面的示例使用 Java 配置重复调用名为 processMessage
的方法的服务(有关如何配置 AOP 拦截器的更多详情,请参阅 Spring 用户指南):
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
前面的示例在拦截器中使用了默认的 RepeatTemplate
。要更改策略、监听器和其他细节,可以向拦截器注入 RepeatTemplate
的实例。
如果被拦截的方法返回 void
,拦截器总是返回 RepeatStatus.CONTINUABLE
(因此如果 CompletionPolicy
没有有限的结束点,可能会导致死循环的危险)。否则,它会一直返回 RepeatStatus.CONTINUABLE
,直到被拦截方法的返回值为 null
。此时,它会返回 RepeatStatus.FINISHED
。因此,目标方法内部的业务逻辑可以通过返回 null
或抛出异常来表示没有更多的工作要做,这个异常会被提供的 RepeatTemplate
中的 ExceptionHandler
重新抛出。