Item 处理

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

ItemReader 和 ItemWriter 接口 对于各自的特定任务都非常有用,但如果要在写入之前插入业务逻辑,该怎么办呢?读取和写入的一种选择是使用复合模式: 创建一个包含另一个 ItemWriterItemWriter 或包含另一个 ItemReaderItemReader。下面的代码就是一个例子:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(Chunk<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

前一个类包含另一个 ItemWriter,它在提供了一些业务逻辑后将其委托给另一个 ItemWriter。这种模式也可以轻松用于 ItemReader,或许可以根据主 ItemReader 提供的输入获取更多参考数据。如果需要自己控制调用 write,这种模式也很有用。不过,如果你只想在实际写入之前 “转换” 传入的要写入的 item,你就不需要自己 write。你只需修改 item 即可。针对这种情况,Spring Batch 提供了 ItemProcessor 接口,如下接口定义所示:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor 非常简单。给定一个对象,将其转换并返回另一个对象。提供的对象可以是同一类型,也可以不是。关键是业务逻辑可以应用于流程中,而创建该逻辑完全取决于开发人员。ItemProcessor 可以直接装配到一个 step 中。例如,假设一个 ItemReader 提供了一个 Foo 类型的类,在写出之前需要将其转换为 Bar 类型。下面的示例显示了一个执行转换的 ItemProcessor

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar> {
    public void write(Chunk<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在前面的示例中,有一个名为 Foo 的类、一个名为 Bar 的类和一个名为 FooProcessor 的类,它们都遵循 ItemProcessor 接口。转换很简单,但任何类型的转换都可以在这里完成。BarWriter 写入 Bar 对象,如果提供了其他类型,则会产生异常。同样,如果提供的不是 FooFooProcessor 也会抛出异常。然后就可以将 FooProcessor 注入到一个 Step 中,如下例所示:

XML Configuration
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>
Java Configuration
@Bean
public Job ioSampleJob(JobRepository jobRepository) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1())
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Bar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}

ItemProcessorItemReaderItemWriter 的区别在于,对于 Step 而言,ItemProcessor 是可选的。

链式项目处理器

执行单个转换在很多情况下都很有用,但如果要将多个 ItemProcessor 实现 “链”在一起呢?你可以使用前面提到的复合模式来实现。更新之前的单一转换示例,Foo 转换为 BarBar 再转换为 Foobar 并写出,如下例所示:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(Chunk<? extends Foobar> items) throws Exception {
        //write items
    }
}

FooProcessorBarProcessor 可以 “链” 在一起,从而产生 Foobar,如下例所示:

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,你可以将复合处理器配置到 Step 中:

XML Configuration
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>
Java Configuration
@Bean
public Job ioSampleJob(JobRepository jobRepository) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1())
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Foobar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}

过滤记录

item 处理器的一个典型用途是在将记录传递给 ItemWriter 之前将其过滤掉。过滤是一种有别于跳过的操作。跳过表示记录无效,而过滤表示记录不应被写入。

例如,考虑一个批处理 job,它读取一个包含三种不同类型记录的文件:插入记录、更新记录和删除记录。如果系统不支持删除记录,我们就不会向 ItemWriter 发送任何可删除的记录。不过,由于这些记录实际上并不是坏记录,我们希望过滤掉它们,而不是跳过它们。因此,ItemWriter 将只接收可插入和可更新的记录。

要过滤记录,可以从 ItemProcessor 返回 null 值。框架会检测到结果为 null,并避免将该 item 添加到交付给 ItemWriter 的记录列表中。ItemProcessor 抛出的异常会导致跳过。

校验输入

ItemReader 和 ItemWriter 章节讨论了解析输入的多种方法。每个主要实现都会在输入不 “完整”时抛出异常。如果缺少数据范围,FixedLengthTokenizer 就会抛出异常。同样,如果试图访问 RowMapperFieldSetMapper 中不存在的索引,或访问的索引格式与预期格式不同,也会抛出异常。所有这些类型的异常都会在 read 返回之前抛出。但是,它们并不能解决返回 item 是否有效的问题。例如,如果其中一个字段是 age,它就不可能是负值。它可能会被正确解析,因为它存在并且是一个数字,但不会导致异常。由于已有大量验证框架,Spring Batch 并未试图提供另一种验证框架。相反,它提供了一个名为 Validator 的简单接口,正如下面的接口定义所示,你可以通过任意数量的框架来实现该接口:

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

其约定是,如果对象无效,validate 方法会抛出异常;如果对象有效,则正常返回。Spring Batch 提供了一个 ValidatingItemProcessor,正如下面的 Bean 定义所示:

XML Configuration
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
	<property name="validator">
		<bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/>
	</property>
</bean>
Java Configuration
@Bean
public ValidatingItemProcessor itemProcessor() {
	ValidatingItemProcessor processor = new ValidatingItemProcessor();

	processor.setValidator(validator());

	return processor;
}

@Bean
public SpringValidator validator() {
	SpringValidator validator = new SpringValidator();

	validator.setValidator(new TradeValidator());

	return validator;
}

你还可以使用 BeanValidatingItemProcessor 验证使用 Bean Validation API(JSR-303)注解,注解的项目。例如,考虑以下 Person 类型:

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

你可以通过在 application context 中声明 BeanValidatingItemProcessor Bean 来验证项目,并将其注册为面向块的 step 中的处理器:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

容错

当一个块回滚时,在读取过程中缓存的 item 可能会被重新处理。如果一个 step 被配置为容错(通常是通过使用跳过或重试处理),那么所使用的任何 ItemProcessor 都应以幂等方式实现。通常情况下,这包括对 ItemProcessor 的输入项不做任何更改,只更新结果实例。