ItemReader 和 ItemWriter

本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。

所有批处理的最简单形式就是读入大量数据,执行某种类型的计算或转换,然后将结果写出。Spring Batch 提供了三个关键接口来帮助执行批量读写: ItemReaderItemProcessorItemWriter

ItemReader

尽管只是一个简单的概念,但 ItemReader 却能从多种不同类型的输入中提供数据。最常见的例子包括

  • 平面文件:Flat-file item reader 从平面文件中读取数据行,该文件通常描述由文件中固定位置定义的数据字段或由某些特殊字符(如逗号)分隔的记录。

  • XML: XML ItemReader 可独立于用于解析、映射和验证对象的技术处理 XML。输入数据可根据 XSD schema 验证 XML 文件。

  • Database: 访问数据库资源可返回结果集,这些结果集可映射到对象进行处理。默认的 SQL ItemReader 实现会调用一个 RowMapper 来返回对象、跟踪当前行(如果需要重启)、存储基本统计信息,并提供一些事务增强功能(稍后解释)。

还有更多可能性,但本章将重点讨论基本的可能性。附录 A 列出了所有可用的 ItemReader 实现的完整列表。

ItemReader 是通用输入操作的基本接口,如下接口定义所示:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

read 方法定义了 ItemReader 最基本的约定。调用该方法将返回一个 item,如果没有其他 item,则返回 null 值。item 可能代表文件中的一行、数据库中的一行或 XML 文件中的一个元素。一般来说,我们希望这些 item 能被映射到一个可用的 domain 对象(如 TradeFoo 或其他),但约定中并没有这样的要求。

预计 ItemReader 接口的实现只能是前向的。但是,如果底层资源是事务性的(如 JMS 队列),那么在回滚情况下,调用 read 可能会在后续调用中返回相同的逻辑项。还值得注意的是,ItemReader 缺乏可处理的 item 不会导致异常抛出。例如,数据库 ItemReader 配置的查询结果为 0,则在第一次调用 read 时返回 null 值。

ItemWriter

ItemWriter 的功能与 ItemReader 相似,但操作相反。资源仍然需要定位、打开和关闭,但它们的不同之处在于,ItemWriter 会写入,而不是读入。对于数据库或队列,这些操作可能是插入、更新或发送。输出的序列化格式取决于每个 batch job。

ItemReader 一样,ItemWriter 也是一个相当通用的接口,如下接口定义所示:

public interface ItemWriter<T> {

    void write(Chunk<? extends T> items) throws Exception;

}

ItemReader 上的 read 一样,write 提供了 ItemWriter 的基本约定。只要打开,它就会尝试写出传入的 item 列表。由于通常情况下,item 会被 "批量" 写入一个大块,然后输出,因此该接口接受的是一个 item 列表,而不是一个 item 本身。在写出列表后,可以在从 write 方法返回之前执行任何必要的刷出。例如,如果向 Hibernate DAO 写入数据,可以多次调用 write 方法,每个 item 调用一次。然后,writer 可以在返回之前在 Hibernate session 上调用 flush

ItemStream

ItemReadersItemWriters 都能很好地实现各自的目的,但它们之间有一个共同的问题,需要另一个接口。一般来说,作 为batch job 范围的一部分,reader 和 writer 需要打开、关闭,并需要一种机制来持久化状态。如下例所示,ItemStream 接口就可以实现这一目的:

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

在介绍每种方法之前,我们都要提到 ExecutionContext。同时实现 ItemStreamItemReader 客户端应在调用 read 之前调用 open,以便打开任何资源(如文件)或获取连接。类似的限制也适用于实现 ItemStreamItemWriter。正如第 2 章所述,如果在 ExecutionContext 中发现了预期数据,则可将其用于在初始状态以外的位置启动 ItemReaderItemWriter。相反,调用 close 方法是为了确保安全释放打开过程中分配的任何资源。调用 update 方法主要是为了确保将当前持有的任何状态加载到所提供的 ExecutionContext 中。该方法在提交前被调用,以确保当前状态在提交前被持久化到数据库中。

ItemStream 的客户端是 Step(来自 Spring Batch Core)的特殊情况下,会为每个 StepExecution 创建 ExecutionContext,以便用户存储特定执行的状态,并期望在再次启动相同 JobInstance 时返回该状态。对于熟悉 Quartz 的人来说,其语义与 Quartz JobDataMap 非常相似。

委托模式和 Step 注册

请注意,CompositeItemWriter 是委托模式的一个示例,这在 Spring Batch 中很常见。委托本身可能实现回调接口,如 StepListener。如果它们实现了回调接口,并且作为 JobStep 的一部分与 Spring Batch Core 结合使用,那么它们几乎肯定需要与 Step 一起手动注册。直接连接到 Step 的 reader、writer 或处理器(processor)如果实现了 ItemStreamStepListener 接口,则会自动注册。但是,由于 Step 不知道委托,因此需要将它们注入为 listener 或 stream(或两者,如果合适)。

下面的示例展示了如何在 XML 中把委托注入为 Stream:

XML Configuration
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
                   commit-interval="2">
                <streams>
                    <stream ref="barWriter" />
                </streams>
            </chunk>
        </tasklet>
    </step>
</job>

<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
    <property name="delegate" ref="barWriter" />
</bean>

<bean id="barWriter" class="...BarWriter" />

下面的示例展示了如何在 Java 中把委托注入为 Stream:

Java Configuration
@Bean
public Job ioSampleJob(JobRepository jobRepository) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1())
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(compositeItemWriter())
				.stream(barWriter())
				.build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

	CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

	writer.setDelegate(barWriter());

	return writer;
}

@Bean
public BarWriter barWriter() {
	return new BarWriter();
}

平面文件

交换批量数据最常用的机制之一一直是平面文件。与 XML 不同的是,XML 有一个约定俗成的标准来定义其结构(XSD),而阅读平面文件的任何人都必须提前了解该文件的确切结构。一般来说,所有平面文件都分为两种类型:分隔型和固定长度型。分隔文件是指字段由逗号等分隔符分隔的文件。固定长度文件的字段长度是固定的。

FieldSet

在 Spring Batch 中处理平面文件时,无论是输入还是输出,最重要的类之一就是 FieldSet。许多架构和库都包含帮助你从文件中读取数据的抽象,但它们通常返回一个 StringString 对象数组。这实际上只能帮你做到一半。FieldSet 是 Spring Batch 从文件资源中绑定字段的抽象。它允许开发人员像处理数据库输入一样处理文件输入。FieldSet 在概念上类似于 JDBC ResultSetFieldSet 只需要一个参数:一个 token String 数组。你还可以选择配置字段名称,这样就可以通过索引或名称访问字段,就像下面示例中的 ResultSet 一样:

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSet 接口上还有更多选项,如 DatelongBigDecimal 等。FieldSet 的最大优势在于,它能对平面文件输入进行一致的解析。无论是在处理格式异常导致的错误时,还是在进行简单的数据转换时,FieldSet 都能保持一致,而不是每个批处理任务都以潜在的意外方式进行不同的解析。

FlatFileItemReader

平面文件是指最多包含二维(表格)数据的任何类型的文件。在 Spring Batch 框架中,FlatFileItemReader 类为读取和解析平面文件提供了基本功能。FlatFileItemReader 的两个最重要的必需依赖项是 ResourceLineMapperLineMapper 接口将在接下来的章节中详细介绍。resource 属性表示 Spring Core Resource。有关如何创建此类 Bean 的文档,请参阅 Spring Framework 第 5 章 - Resources。因此,除了展示以下简单示例外,本指南将不再详述创建 Resource 对象的细节:

Resource resource = new FileSystemResource("resources/trades.csv");

在复杂的批处理环境中,目录结构通常由企业应用集成(EAI)基础架构管理,其中为外部接口建立了下拉区,以便将文件从 FTP 位置移动到批处理位置,反之亦然。文件移动程序超出了 Spring Batch 架构的范围,但批处理作业流中包含文件移动程序作为作业流中的步骤并不罕见。批处理架构只需要知道如何定位要处理的文件。Spring Batch 从这个起点开始将数据送入管道。然而,Spring Integration 提供了许多此类服务。

FlatFileItemReader 中的其他属性可以让你进一步指定数据的解释方式,如下表所述:

Table 1. FlatFileItemReader Properties
属性 类型 说明

comments

String[]

指定用于表示注释行的行前缀。

encoding

String

指定要使用的文本编码。默认值为 UTF-8

lineMapper

LineMapper

String 转换成表示 item 的 Object

linesToSkip

int

文件顶部忽略的行数。

recordSeparatorPolicy

RecordSeparatorPolicy

用于确定行结束符的位置,并在引号字符串内的行结束符上执行续行等操作。

resource

Resource

读取的资源。

skippedLinesCallback

LineCallbackHandler

用于传递文件中要跳过的行的原始行内容的接口。如果 linesToSkip 设置为 2,则该接口会被调用两次。

strict

boolean

在严格模式下,如果输入资源不存在,reader 会在 ExecutionContext 上抛出异常。否则,它会记录问题并继续。

LineMapper

正如 RowMapper 采用 ResultSet 等底层结构并返回 Object 一样,平面文件处理也需要相同的结构来将 String 行转换为 Object,如下接口定义所示:

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

其基本约定是,在给定当前行及其关联行号的情况下,mapper 应返回一个结果 domain 对象。这与 RowMapper 类似,因为每一行都与其行号相关联,就像 ResultSet 中的每一行都与其行号相关联一样。这样,行号就可以与生成的 domain 对象绑定,以便进行 id 比较或记录更多信息。然而,与 RowMapper 不同的是,LineMapper 给出的是原始行,如上文所述,这只能实现一半的目标。必须将该行标记化为一个 FieldSet,然后将其映射到一个对象(如本文档稍后所述)。

LineTokenizer

将一行输入转化为 FieldSet 的抽象是必要的,因为可能有多种格式的平面文件数据需要转换为 FieldSet。在 Spring Batch 中,这个接口就是 LineTokenizer

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

LineTokenizer 的约定是这样的:给定一行输入(理论上,String 可以包含多行),返回一个代表该行的 FieldSet。然后,该 FieldSet 可以传递给 FieldSetMapper。Spring Batch 包含以下 LineTokenizer 实现:

  • DelimitedLineTokenizer:用于用分隔符分隔记录字段的文件。最常用的分隔符是逗号,但也经常使用管道或分号。

  • FixedLengthTokenizer:用于记录中每个字段都是 "固定宽度" 的文件。必须为每种记录类型定义每个字段的宽度。

  • PatternMatchingCompositeLineTokenizer:通过检查模式,确定应在特定行上使用 tokenizer 列表中的哪个 LineTokenizer

FieldSetMapper

FieldSetMapper 接口定义了一个方法 mapFieldSet,该方法接收一个 FieldSet 对象并将其内容映射到一个对象。该对象可以是自定义 DTO、domain 对象或数组,具体取决于 job 的需要。FieldSetMapperLineTokenizer 结合使用,可将资源中的一行数据转换为所需类型的对象,如下接口定义所示:

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

使用的 pattern 与 JdbcTemplate 使用的 RowMapper 相同。

DefaultLineMapper

既然已经定义了读取平面文件的基本接口,那么显然需要三个基本步骤:

  1. 从文件中读取一行。

  2. 将字符串行传入 LineTokenizer#tokenize() 方法,以获取 FieldSet

  3. 将 tokenizer 返回的 FieldSet 传递给 FieldSetMapper,返回 ItemReader#read() 方法的结果。

上述两个接口分别代表两项不同的任务:将行转换为 FieldSet 和将 FieldSet 映射到 domain 对象。由于 LineTokenizer 的输入与 LineMapper 的输入(一行)相匹配,而 FieldSetMapper 的输出与 LineMapper 的输出相匹配,因此我们提供了一个同时使用 LineTokenizerFieldSetMapper 的默认实现。DefaultLineMapper 如下面的类定义所示,代表了大多数用户需要的行为:

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

上述功能是在默认实现中提供的,而不是内置于 reader 本身(如框架以前的版本),以便用户在控制解析过程时有更大的灵活性,尤其是在需要访问原始行的情况下。

简单的分隔文件读取示例

下面的示例说明了如何通过实际 domain 场景读取平面文件。该批处理任务从以下文件读入足球运动员:

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"

该文件的内容映射到以下 Player domain 对象:

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

要将 FieldSet 映射到 Player 对象,需要定义一个返回 player 的 FieldSetMapper,如下例所示:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

然后,可以通过正确构造一个 FlatFileItemReader 并调用 read 来读取文件,如下例所示:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
//DelimitedLineTokenizer defaults to comma as its delimiter
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

每次调用 read 方法都会从文件中的每一行返回一个新的 Player 对象。当读取到文件末尾时,将返回 null 值。

按名称映射字段

DelimitedLineTokenizerFixedLengthTokenizer 都允许一个额外的功能,其功能类似于 JDBC ResultSet。字段名称可以注入到任何一种 LineTokenizer 实现中,以增加映射函数的可读性。首先,将平面文件中所有字段的列名注入 tokenizer,如下例所示:

tokenizer.setNames(new String[] {"ID", "lastName", "firstName", "position", "birthYear", "debutYear"});

FieldSetMapper 可以如下使用这些信息:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if (fs == null) {
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}
将字段集自动映射到 Domain 对象

对许多人来说,编写特定的 FieldSetMapper 就像为 JdbcTemplate 编写特定的 RowMapper 一样麻烦。Spring Batch 提供了一个 FieldSetMapper,通过使用 JavaBean 规范将字段名与对象上的 setter 进行匹配,从而自动映射字段,从而简化了这一工作。

还是以足球为例,BeanWrapperFieldSetMapper 配置的 XML 代码段如下:

XML Configuration
<bean id="fieldSetMapper"
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
    <property name="prototypeBeanName" value="player" />
</bean>

<bean id="player"
      class="org.springframework.batch.sample.domain.Player"
      scope="prototype" />

还是以足球为例,BeanWrapperFieldSetMapper 配置的 Java 代码段如下:

Java Configuration
@Bean
public FieldSetMapper fieldSetMapper() {
	BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

	fieldSetMapper.setPrototypeBeanName("player");

	return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
	return new Player();
}

对于 FieldSet 中的每个条目,mapper 都会在 Player 对象的新实例上查找相应的 setter(因此需要 prototype scope),就像 Spring 容器查找与属性名称匹配的 setter 一样。对 FieldSet 中的每个可用字段进行映射,然后返回结果 Player 对象,无需代码。

固定长度文件格式

到目前为止,我们只详细讨论了分隔文件。然而,它们只代表了文件读取的一半。许多使用平面文件的组织都使用固定长度格式。下面是一个固定长度文件的示例:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

虽然这看起来像一个大字段,但实际上代表了 4 个不同的字段:

  1. ISIN: 订单商品的唯一标识符 - 12 个字符。

  2. Quantity: 订单商品的编号 - 3 个字符。

  3. Price: 商品价格 - 5 个字符。

  4. Customer: 订单商品的客户 ID - 9 个字符。

在配置 FixedLengthLineTokenizer 时,必须以范围的形式提供这些长度。

下面的示例展示了如何用 XML 为 FixedLengthLineTokenizer 标记符定义范围:

XML Configuration
<bean id="fixedLengthLineTokenizer"
      class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
    <property name="names" value="ISIN,Quantity,Price,Customer" />
    <property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>

由于 FixedLengthLineTokenizer 使用了与前面讨论的 LineTokenizer 相同的接口,因此它返回的 FieldSet 与使用了分隔符的 FieldSet 相同。这就允许使用相同的方法来处理其输出,如使用 BeanWrapperFieldSetMapper

支持上述范围语法需要在 ApplicationContext 中配置专门的 property editor RangeArrayPropertyEditor。不过,在使用 batch 命名空间的 ApplicationContext 中会自动声明该 bean。

下面的示例展示了如何用 Java 为 FixedLengthLineTokenizer 定义范围:

Java Configuration
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
	FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

	tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
	tokenizer.setColumns(new Range(1, 12),
						new Range(13, 15),
						new Range(16, 20),
						new Range(21, 29));

	return tokenizer;
}

由于 FixedLengthLineTokenizer 使用了与上文讨论的 LineTokenizer 相同的接口,因此它返回的 FieldSet 与使用了分隔符的 FieldSet 相同。这样,在处理其输出时就可以使用相同的方法,例如使用 BeanWrapperFieldSetMapper

单个文件中的多种记录类型

到此为止,所有的文件读取示例都为了简单起见做了一个关键假设:文件中的所有记录都具有相同的格式。然而,情况并非总是如此。很常见的情况是,文件中的记录可能具有不同的格式,需要以不同的方式标记并映射到不同的对象。下面的文件摘录就说明了这一点:

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

在这个文件中,我们有三种类型的记录:"USER"、"LINEA" 和 "LINEB"。 "USER" 行对应一个 User 对象。"LINEA" 和 "LINEB" 都对应 Line 对象,但 "LINEA" 比 "LINEB" 包含更多信息。

ItemReader 会单独读取每一行,但我们必须指定不同的 LineTokenizerFieldSetMapper 对象,以便 ItemWriter 接收到正确的项目。PatternMatchingCompositeLineMapper 允许将 pattern 映射到 LineTokenizer 和 pattern 映射到 FieldSetMapper,从而简化了这一过程。

下面的示例展示了如何用 XML 为 FixedLengthLineTokenizer 定义范围:

XML Configuration
<bean id="orderFileLineMapper"
      class="org.spr...PatternMatchingCompositeLineMapper">
    <property name="tokenizers">
        <map>
            <entry key="USER*" value-ref="userTokenizer" />
            <entry key="LINEA*" value-ref="lineATokenizer" />
            <entry key="LINEB*" value-ref="lineBTokenizer" />
        </map>
    </property>
    <property name="fieldSetMappers">
        <map>
            <entry key="USER*" value-ref="userFieldSetMapper" />
            <entry key="LINE*" value-ref="lineFieldSetMapper" />
        </map>
    </property>
</bean>
Java Configuration
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
	PatternMatchingCompositeLineMapper lineMapper =
		new PatternMatchingCompositeLineMapper();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
	tokenizers.put("USER*", userTokenizer());
	tokenizers.put("LINEA*", lineATokenizer());
	tokenizers.put("LINEB*", lineBTokenizer());

	lineMapper.setTokenizers(tokenizers);

	Map<String, FieldSetMapper> mappers = new HashMap<>(2);
	mappers.put("USER*", userFieldSetMapper());
	mappers.put("LINE*", lineFieldSetMapper());

	lineMapper.setFieldSetMappers(mappers);

	return lineMapper;
}

在本例中,"LINEA" 和 "LINEB" 分别拥有不同的 LineTokenizer 实例,但它们都使用相同的 FieldSetMapper

PatternMatchingCompositeLineMapper 使用 PatternMatcher#match 方法为每一行选择正确的委托。PatternMatcher 允许使用两个具有特殊含义的通配符:问号("?")匹配的是一个字符,而星号("*")匹配的是零个或多个字符。请注意,在前面的配置中,所有模式都以星号结尾,因此它们实际上是行的前缀。无论配置顺序如何,PatternMatcher 总是尽可能匹配最特殊的模式。因此,如果 "LINE*" 和 "LINEA*" 都被列为模式,"LINEA" 将匹配模式 "LINEA*",而 "LINEB" 将匹配模式 "LINE*"。此外,单个星号("*")可以作为默认值,匹配任何其他模式不匹配的行。

下面的示例展示了如何匹配 XML 中任何其他模式都不匹配的行:

XML Configuration
<entry key="*" value-ref="defaultLineTokenizer" />

下面的示例展示了如何在 Java 中匹配其他模式无法匹配的行:

Java Configuration
...
tokenizers.put("*", defaultLineTokenizer());
...

还有一个 PatternMatchingCompositeLineTokenizer 可单独用于 tokenizer。

平面文件中包含跨多行记录的情况也很常见。要处理这种情况,需要采用更复杂的策略。在 multiLineRecords 示例中可以找到这种常见模式的演示。

平面文件中的异常处理

在很多情况下,tokenizer 一行可能会导致异常抛出。许多平面文件并不完美,包含格式不正确的记录。许多用户选择跳过这些错误行,同时记录问题、原始行和行号。这些日志随后可以手动或通过其他批处理 job 进行检查。因此,Spring Batch 提供了一个处理解析异常的异常层次结构: FlatFileParseExceptionFlatFileFormatExceptionFlatFileItemReader 在尝试读取文件时遇到任何错误,就会抛出 FlatFileParseExceptionFlatFileFormatExceptionLineTokenizer 接口的实现抛出,表示在 tokenizer 过程中遇到了更具体的错误。

IncorrectTokenCountException

DelimitedLineTokenizerFixedLengthLineTokenizer 都能指定用于创建 FieldSet 的列名。但是,如果列名数量与标记行时发现的列名数量不匹配,则无法创建 FieldSet,并会抛出 IncorrectTokenCountException(不正确标记数异常),其中包含遇到的标记数和预期的标记数,如下例所示:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch (IncorrectTokenCountException e) {
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

由于 tokenizer 配置了 4 个列名,但在文件中只找到 3 个 token,因此出现了不正确 token 计数异常(IncorrectTokenCountException)。

IncorrectLineLengthException

固定长度格式的文件在解析时有额外的要求,因为与分隔格式不同,每一列都必须严格遵守预定义的宽度。如果行的总长度不等于该列的最宽值,就会出现异常,如下例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上述 tokenizer 的配置范围是 1-5、6-10 和 11-15。因此,行的总长度为 15。然而,在前面的示例中,传入了长度为 5 的行,导致抛出了一个不正确行长异常(IncorrectLineLengthException)。在这里抛出异常,而不是只映射第一列,可以使对该行的处理更早失败,并且比在 FieldSetMapper 中尝试读入第 2 列时失败所包含的信息更多。不过,在有些情况下,行的长度并不总是恒定的。因此,可以通过 'strict' 属性关闭行长度验证,如下例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

前面的示例与前面的示例几乎完全相同,只是调用了 tokenizer.setStrict(false)。该设置告诉标 tokenizer 在标记行时不强制执行行长。现在,FieldSet 已正确创建并返回。不过,它只包含剩余值的空标记(token)。

FlatFileItemWriter

向平面文件写入与从文件读入有同样的问题和必须克服的困难。一个 step 必须能够以事务方式写入分隔格式或固定长度格式。

LineAggregator

正如 LineTokenizer 接口需要将一个 item 转换成 String 一样,文件写入也必须有一种方法将多个字段聚合成一个字符串,以便写入文件。在 Spring Batch 中,这就是 LineAggregator,如下接口定义所示:

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregatorLineTokenizer 在逻辑上正好相反。LineTokenizer 接收一个 String 并返回一个 FieldSet,而 LineAggregator 接收一个 item 并返回一个 String

PassThroughLineAggregator

LineAggregator 接口最基本的实现是 PassThroughLineAggregator,它假定对象已经是 string,或者其 string 表示可以接受写入,如下代码所示:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

如果需要直接控制字符串的创建,但又需要 FlatFileItemWriter 的优势(如事务和重启支持),前述实现就非常有用。

简单文件的写入示例

既然已经定义了 LineAggregator 接口及其最基本的实现,即 PassThroughLineAggregator,那么就可以解释写入的基本流程了:

  1. 要写入的 object 被传递给 LineAggregator,以获得一个 String

  2. 返回的 String 将写入配置文件。

下面摘录的 FlatFileItemWriter 代码表达了这一点:

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

在 XML 中,一个简单的配置示例可能如下所示:

XML Configuration
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" value="file:target/test-outputs/output.txt" />
    <property name="lineAggregator">
        <bean class="org.spr...PassThroughLineAggregator"/>
    </property>
</bean>

在 Java 中,一个简单的配置示例可能如下所示:

Java Configuration
@Bean
public FlatFileItemWriter itemWriter() {
	return  new FlatFileItemWriterBuilder<Foo>()
           			.name("itemWriter")
           			.resource(new FileSystemResource("target/test-outputs/output.txt"))
           			.lineAggregator(new PassThroughLineAggregator<>())
           			.build();
}
FieldExtractor

前面的示例可能对写入文件的最基本用途有用。但是,FlatFileItemWriter 的大多数用户都有一个需要写出的 domain 对象,因此,必须将其转换成一行。在文件读取中,需要如下操作:

  1. 从文件中读取一行。

  2. 将该行传递给 LineTokenizer#tokenize() 方法,以获取一个 FieldSet

  3. P将 tokenizer 返回的 FieldSet 传递给 FieldSetMapper,返回 ItemReader#read() 方法的结果。

文件写入的步骤与此类似,但正好相反:

  1. 将要写入的 item 传递给 writer。

  2. 将 item 上的字段转换为数组。

  3. 将得到的数组汇总成一行。

由于框架无法知道需要写出对象中的哪些字段,因此必须编写一个 FieldExtractor 来完成将 item 转化为数组的任务,如下面的接口定义所示:

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

FieldExtractor 接口的实现应从所提供对象的字段中创建一个数组,然后可以在元素之间使用分隔符或作为固定宽度行的一部分写出。

PassThroughFieldExtractor

需要写出数组、CollectionFieldSet 的情况很多。从这些集合类型中 "提取" 数组非常简单。为此,需要将集合转换为数组。因此,在这种情况下应使用 PassThroughFieldExtractor。需要注意的是,如果传入的对象不是集合类型,那么 PassThroughFieldExtractor 返回的数组只包含要提取的项目。

BeanWrapperFieldExtractor

与文件读取部分描述的 BeanWrapperFieldSetMapper 一样,通常最好配置如何将 domain 对象转换为对象数组,而不是自己编写转换过程。BeanWrapperFieldExtractor 提供了这种功能,如下例所示:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

该 extractor 的实现只有一个必需属性:要映射的字段名。正如 BeanWrapperFieldSetMapper 需要字段名称来将 FieldSet 上的字段映射到所提供对象上的 setter 一样,BeanWrapperFieldExtractor 也需要字段名称来映射到 getter 以创建对象数组。值得注意的是,名称的顺序决定了数组中字段的顺序。

带分隔符文件写入示例

最基本的平面文件格式是所有字段都用分隔符分隔。这可以使用 DelimitedLineAggregator 来实现。下面的示例写出了一个简单的 domain 对象,表示客户账户的贷记:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

由于使用的是 domain 对象,因此必须提供 FieldExtractor 接口的实现以及要使用的分隔符。

下面的示例展示了如何在 XML 中使用带有分隔符的 FieldExtractor

XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...DelimitedLineAggregator">
            <property name="delimiter" value=","/>
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

下面的示例展示了如何在 Java 中使用带有分隔符的 FieldExtractor

Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
	lineAggregator.setDelimiter(",");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

在上一个示例中,本章前面介绍的 BeanWrapperFieldExtractor 用于将 CustomerCredit 中的 name 和 credit 字段转化为对象数组,然后在每个字段之间用逗号隔开。

也可以使用 FlatFileItemWriterBuilder.DelimitedBuilder 自动创建 BeanWrapperFieldExtractorDelimitedLineAggregator,如下例所示:

Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.delimited()
				.delimiter("|")
				.names(new String[] {"name", "credit"})
				.build();
}
固定宽度文件写入示例

分隔不是唯一的平面文件格式。许多人喜欢为每一列设定宽度,以便在字段之间划线,这通常被称为 "固定宽度"。Spring Batch 通过 FormatterLineAggregator 支持这种文件写入方式。

使用上述相同的 CustomerCredit domain 对象,可以用 XML 进行如下配置:

XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...FormatterLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit" />
                </bean>
            </property>
            <property name="format" value="%-9s%-2.0f" />
        </bean>
    </property>
</bean>

使用上述相同的 CustomerCredit domain 对象,可以在 Java 中进行如下配置:

Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
	lineAggregator.setFormat("%-9s%-2.0f");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

前面示例中的大部分内容都很熟悉。不过,format 属性的值是新的。

下面的示例显示了 XML 中的 format 属性:

<property name="format" value="%-9s%-2.0f" />

下面的示例展示了 Java 中的 format 属性:

...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

底层实现是使用作为 Java 5 一部分添加的 Formatter 构建的。Java Formatter 基于 C 语言的 printf 功能。有关如何配置 formatter 的详细信息,请参阅 Formatter 的 Javadoc。

也可以使用 FlatFileItemWriterBuilder.FormattedBuilder 自动创建 BeanWrapperFieldExtractorFormatterLineAggregator,如下例所示:

Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.formatted()
				.format("%-9s%-2.0f")
				.names(new String[] {"name", "credit"})
				.build();
}
处理文件的创建

FlatFileItemReader 与文件资源的关系非常简单。当 reader 初始化时,它会打开文件(如果存在的话),如果打不开则会抛出异常。文件写入就没那么简单了。乍一看,FlatFileItemWriter 似乎也应该有类似的直接约束: 如果文件已经存在,则抛出异常;如果不存在,则创建文件并开始写入。但是,重新启动 Job 可能会导致问题。在正常重启情况下,约束是相反的:如果文件存在,则从最后已知的良好位置开始写入,如果不存在,则抛出异常。但是,如果该任务的文件名总是相同,会发生什么情况呢?在这种情况下,如果文件存在,除非是重启,否则就应该删除。考虑到这种可能性,FlatFileItemWriter 包含了 shouldDeleteIfExists 属性。将此属性设置为 true 会导致在打开 writer 时删除同名的现有文件。

XML Item Reader/Writer

Spring Batch 为读取 XML 记录并将其映射到 Java 对象以及将 Java 对象写入 XML 记录提供了事务处理基础架构。

Constraints on streaming XML

StAX API 用于 I/O,因为其他标准 XML 解析 API 不符合批量处理要求(DOM 一次将整个输入加载到内存中,而 SAX 只允许用户提供回调来控制解析过程)。

我们需要考虑在 Spring Batch 中 XML 输入和输出是如何工作的。首先,有几个概念与文件读写不同,但在 Spring Batch 的 XML 处理中是通用的。在 XML 处理中,我们认为 XML 资源是与单个记录相对应的 "片段" 集合,而不是需要 tokenize 的记录行(FieldSet 实例),如下图所示:

XML Input
Figure 1. XML Input

在上述情况中,'trade' 标签被定义为 "根元素"。介于 '<trade>' 和 '</trade>' 之间的所有内容都被视为一个 "片段"。Spring Batch 使用对象/XML 映射 (OXM) 将片段绑定到对象。不过,Spring Batch 并不依赖于任何特定的 XML 绑定技术。典型的用法是委托 Spring OXM,它为最流行的 OXM 技术提供了统一的抽象。对 Spring OXM 的依赖是可选的,如果需要,你可以选择实现 Spring Batch 的特定接口。与 OXM 所支持技术的关系如下图所示:

OXM Binding
Figure 2. OXM Binding

在介绍了 OXM 以及如何使用 XML 片段来表示记录之后,我们现在可以更仔细地研究 reader 和 writer 了。

StaxEventItemReader

StaxEventItemReader 配置为处理来自 XML 输入流的记录提供了一个典型设置。首先,考虑一下 StaxEventItemReader 可以处理的以下 XML 记录集:

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

要处理 XML 记录,需要具备以下条件:

  • 根元素名称: 构成要映射对象的片段根元素名称。示例配置中的值为 trade。

  • Resource:表示要读取文件的 Spring 资源。

  • Unmarshaller: Spring OXM 提供的一种解码工具,用于将 XML 片段映射到对象。

下面的示例展示了如何定义一个 StaxEventItemReader,该 reader 可与名为 trade 的根元素、data/iosample/input/input.xml 的资源以及 XML 中名为 tradeMarshaller 的 unmarshaller 协同工作:

XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
    <property name="fragmentRootElementName" value="trade" />
    <property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
    <property name="unmarshaller" ref="tradeMarshaller" />
</bean>

下面的示例展示了如何定义一个 StaxEventItemReader,该 reader 可与名为 trade 的根元素、data/iosample/input/input.xml 的资源和名为 tradeMarshallerunmarshaller 一起使用:

Java Configuration
@Bean
public StaxEventItemReader itemReader() {
	return new StaxEventItemReaderBuilder<Trade>()
			.name("itemReader")
			.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
			.addFragmentRootElements("trade")
			.unmarshaller(tradeMarshaller())
			.build();

}

请注意,在本例中,我们选择使用 XStreamMarshaller,它接受以 map 形式传入的别名,第一个 key 和 value 是片段名称(即根元素)和要绑定的对象类型。然后,与 FieldSet 类似,映射到对象类型中字段的其他元素的名称在映射表中描述为 key/value 对。在配置文件中,我们可以使用 Spring 配置工具来描述所需的别名。

下面的示例展示了如何用 XML 描述别名:

XML Configuration
<bean id="tradeMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="trade"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

下面的示例展示了如何用 Java 来描述别名:

Java Configuration
@Bean
public XStreamMarshaller tradeMarshaller() {
	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	XStreamMarshaller marshaller = new XStreamMarshaller();

	marshaller.setAliases(aliases);

	return marshaller;
}

在输入时,reader 会读取 XML 资源,直到识别到一个新片段即将开始。默认情况下,reader 通过匹配元素名称来识别即将开始的新片段。reader 从片段中创建独立的 XML 文档,并将文档传递给反序列化器(通常是 Spring OXM Unmarshaller 的 wrapper),以便将 XML 映射到 Java 对象。

总之,该过程类似于以下使用 Spring 配置提供的注入的 Java 代码:

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true;

Trade trade = null;

while (hasNext) {
    trade = xmlStaxEventItemReader.read();
    if (trade == null) {
        hasNext = false;
    }
    else {
        System.out.println(trade);
    }
}

StaxEventItemWriter

输出与输入类似。StaxEventItemWriter 需要一个 Resource、一个 marshaller 和一个 rootTagName。Java 对象会传递给一个 marshaller(通常是标准的 Spring OXM Marshaller),Marshaller 会使用自定义事件 writer 将其写入 Resource,自定义事件 writer 会过滤 OXM 工具为每个片段生成的 StartDocumentEndDocument 事件。

下面的 XML 示例使用了 MarshallingEventWriterSerializer

XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="marshaller" ref="tradeMarshaller" />
    <property name="rootTagName" value="trade" />
    <property name="overwriteOutput" value="true" />
</bean>

下面的 Java 示例使用了 MarshallingEventWriterSerializer

Java Configuration
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
	return new StaxEventItemWriterBuilder<Trade>()
			.name("tradesWriter")
			.marshaller(tradeMarshaller())
			.resource(outputResource)
			.rootTagName("trade")
			.overwriteOutput(true)
			.build();

}

前面的配置设置了三个所需的属性,并设置了本章前面提到的可选属性 overwriteOutput=true,用于指定是否可以覆盖现有文件。

下面的 XML 示例与本章前面的读取示例中使用的 marshaller 相同:

XML Configuration
<bean id="customerCreditMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="customer"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

下面的 Java 示例与本章前面的读取示例中使用的 marshaller 相同:

Java Configuration
@Bean
public XStreamMarshaller customerCreditMarshaller() {
	XStreamMarshaller marshaller = new XStreamMarshaller();

	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	marshaller.setAliases(aliases);

	return marshaller;
}

下面用一个 Java 示例来概括所有讨论的要点,演示如何通过编程设置所需的属性:

FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

StaxEventItemWriter staxItemWriter =
	new StaxEventItemWriterBuilder<Trade>()
				.name("tradesWriter")
				.marshaller(marshaller)
				.resource(resource)
				.rootTagName("trade")
				.overwriteOutput(true)
				.build();

staxItemWriter.afterPropertiesSet();

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);

读写 JSON Item

Spring Batch 支持读写以下格式的 JSON 资源:

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

假定 JSON 资源是与单个项目相对应的 JSON 对象数组。Spring Batch 与任何特定的 JSON 库无关。

JsonItemReader

JsonItemReader 将 JSON 解析和绑定委托给 org.springframework.batch.item.json.JsonObjectReader 接口的实现。该接口旨在通过使用流式 API 以分块方式读取 JSON 对象来实现。目前提供了两种实现:

  • Jacksonorg.springframework.batch.item.json.JacksonJsonObjectReader

  • Gsonorg.springframework.batch.item.json.GsonJsonObjectReader

要处理 JSON 记录,需要具备以下条件:

  • Resource: 表示要读取的 JSON 文件的 Spring Resource。

  • JsonObjectReader: JSON object reader,用于解析 JSON 对象并将其与 item 绑定

下面的示例展示了如何定义一个可与之前的 JSON 资源 org/springframework/batch/item/json/trades.json 协同工作的 JsonItemReader 和一个基于 Jackson 的 JsonObjectReader

@Bean
public JsonItemReader<Trade> jsonItemReader() {
   return new JsonItemReaderBuilder<Trade>()
                 .jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonItemReader")
                 .build();
}

JsonFileItemWriter

JsonFileItemWriter 委托 org.springframework.batch.item.json.JsonObjectMarshaller 接口对项目进行编译。该接口接收一个对象并将其序列化为一个 JSON String。目前提供两种实现:

  • Jacksonorg.springframework.batch.item.json.JacksonJsonObjectMarshaller

  • Gsonorg.springframework.batch.item.json.GsonJsonObjectMarshaller

要写入 JSON 记录,需要具备以下条件:

  • Resource:表示要写入的 JSON 文件的 Spring Resource

  • JsonObjectMarshaller:一个 JSON 对象marshaller ,用于将对象序列化为 JSON 格式。

下面的示例展示了如何定义 JsonFileItemWriter

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

多文件输入

在一个 Step 中处理多个文件是常见的要求。假设所有文件格式相同,MultiResourceItemReader 支持这种类型的 XML 和平面文件处理输入。请考虑目录中的以下文件:

file-1.txt  file-2.txt  ignored.txt

file-1.txt 和 file-2.txt 格式相同,出于业务原因,应一起处理。MultiResourceItemReader 可通过使用通配符读取这两个文件。

下面的示例展示了如何读取 XML 中带有通配符的文件:

XML Configuration
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
    <property name="resources" value="classpath:data/input/file-*.txt" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>

下面的示例展示了如何在 Java 中使用通配符读取文件:

Java Configuration
@Bean
public MultiResourceItemReader multiResourceReader() {
	return new MultiResourceItemReaderBuilder<Foo>()
					.delegate(flatFileItemReader())
					.resources(resources())
					.build();
}

引用的委托(delegate)是一个简单的 FlatFileItemReader。上述配置从两个文件中读取输入,处理回滚和重启情况。需要注意的是,与任何 ItemReader 一样,添加额外输入(本例中为文件)可能会在重启时造成潜在问题。建议批处理任务在成功完成前,使用各自的单独目录。

通过使用 MultiResourceItemReader#setComparator(Comparator) 对输入资源进行排序,以确保在重启场景中 job 运行之间保留资源顺序。

数据库

与大多数企业应用程序一样,数据库是批处理的核心存储机制。然而,批处理与其他应用方式不同,因为系统必须处理的数据集非常大。如果一条 SQL 语句返回 100 万行,那么结果集可能会将所有返回结果保存在内存中,直到读取完所有行。Spring Batch 针对这一问题提供了两种解决方案:

基于游标的 ItemReader 实现

使用数据库游标通常是大多数批处理开发人员的默认方法,因为这是数据库对 "流式" 关系数据问题的解决方案。Java ResultSet 类本质上是一种操作游标的面向对象机制。ResultSet 维护着一个指向当前数据行的游标。在 ResultSet 上调用 next 会将光标移动到下一行。基于 Spring Batch 游标的 ItemReader 实现会在初始化时打开一个游标,并在每次调用 read 时将游标向前移动一行,返回一个可用于处理的映射对象。然后调用 close 方法确保释放所有资源。Spring core JdbcTemplate 通过使用回调模式来完全映射 ResultSet 中的所有行,并在将控制权返回给方法调用者之前关闭,从而解决了这个问题。但是,在批处理中,这必须等到 step 完成。下图显示了基于游标的 ItemReader 的工作原理图。请注意,虽然示例使用的是 SQL(因为 SQL 广为人知),但任何技术都可以实现基本方法

游标示例
Figure 3. 游标示例

本例说明了基本模式。给定一个 "FOO" 表,其中有三列:IDNAMEBAR: 选择 ID 大于 1 但小于 7 的所有行,这样游标(第 1 行)的起点就位于 ID 2 上。这一行的结果应该是一个完全映射的 Foo 对象。再次调用 read() 会将光标移到下一行,即 ID 为 3 的 Foo。 每次读取后,都会写出这些读取结果,以便对这些对象进行垃圾回收(假设没有实例变量保持对它们的引用)。

JdbcCursorItemReader

JdbcCursorItemReader 是基于游标技术的 JDBC 实现。它可直接与 ResultSet 一起使用,并要求针对从 DataSource 获取的连接运行 SQL 语句。以下面数据库 schema 为例:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人喜欢为每一行使用一个 domain 对象,因此下面的示例使用 RowMapper 接口的实现来映射 CustomerCredit 对象:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

由于 JdbcCursorItemReaderJdbcTemplate 共享关键接口,因此我们有必要举例说明如何使用 JdbcTemplate 读取这些数据,以便与 ItemReader 进行对比。在本示例中,假设 CUSTOMER 数据库中有 1000 条记录。第一个示例使用 JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

运行上述代码段后,customerCredits 列表中包含 1,000 个 CustomerCredit 对象。在 query 方法中,从 DataSource 获取连接,针对连接运行所提供的 SQL,并针对结果集中的每一行调用 mapRow 方法。这与下面示例中的 JdbcCursorItemReader 方法形成了鲜明对比:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

运行上述代码段后,counter 等于 1,000。如果上面的代码将返回的 customerCredit 放入一个 list,结果将与 JdbcTemplate 示例完全相同。然而,ItemReader 的最大优势在于它允许对项目进行 "流式" 处理。只需调用一次 read 方法,ItemWriter 就能写出项目,然后通过 read 获取下一个项目。这样,项目读写就可以分 "块" 完成,并定期提交,这正是高性能批处理的精髓所在。此外,它还可以轻松配置,以便注入 Spring Batch Step

下面的示例展示了如何将 ItemReader 注入 XML 中的一个 Step

XML Configuration
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

下面的示例展示了如何在 Java Step 中注入 ItemReader

Java Configuration
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}
其他属性

由于在 Java 中打开游标有许多不同的选项,因此可以在 JdbcCursorItemReader 上设置许多属性,如下表所示:

Table 2. JdbcCursorItemReader 属性

ignoreWarnings

决定是否记录 SQLWarnings 或导致异常。默认值为 true(表示记录警告)。

fetchSize

为 JDBC 驱动程序提供提示,说明当 ItemReader 使用的 ResultSet 对象需要更多记录时,应从数据库中获取的记录数。默认情况下不提供提示。

maxRows

设置底层`ResultSet` 在同一时间可容纳的最大行数限制。

queryTimeout

设置驱动程序等待 Statement 对象运行的秒数。如果超过限制,将抛出 DataAccessException。(详情请查阅驱动程序供应商文档)。

verifyCursorPosition

由于 ItemReader 持有的同一个 ResultSet 会传递给 RowMapper,因此用户有可能自己调用 ResultSet.next(),这可能会导致 reader 的内部计数出现问题。如果将该值设置为 true,在调用 RowMapper 之后,光标位置与调用之前的位置不一致,就会产生异常。

saveState

表示 reader 的状态是否应保存在 ItemStream#update(ExecutionContext) 提供的 ExecutionContext 中。默认为 true

driverSupportsAbsolute

表示 JDBC 驱动程序是否支持在 ResultSet 上设置绝对行。对于支持 ResultSet.absolute() 的 JDBC 驱动程序,建议将此设置为 true,因为它可以提高性能,尤其是在处理大型数据集时 step 失败的情况下。默认值为 false

setUseSharedExtendedConnection

指示游标使用的连接是否应被所有其他处理使用,从而共享同一个事务。如果设置 false,那么游标将使用自己的连接打开,并且不参与为其他 step 处理启动的任何事务。如果将此标记设为 true,则必须用扩展连接数据源代理(ExtendedConnectionDataSourceProxy)封装数据源,以防止每次提交后连接被关闭和释放。如果将该选项设置为 true,用于打开游标的语句将同时包含 READ_ONLYHOLD_CURSORS_OVER_COMMIT 选项。这样就可以在事务启动和 step 处理中的提交过程中保持游标处于打开状态。要使用此功能,需要有支持此功能的数据库和支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认值为 false

HibernateCursorItemReader

就像普通 Spring 用户在决定是否使用 ORM 解决方案时会影响他们是否使用 JdbcTemplateHibernateTemplate 一样,Spring Batch 用户也有同样的选择。HibernateCursorItemReader 是游标技术的 Hibernate 实现。在批处理中使用 Hibernate 一直颇具争议。这主要是因为 Hibernate 最初是为支持在线应用程序风格而开发的。但这并不意味着它不能用于批处理。解决这个问题的最简单方法是使用无状态会话(StatelessSession)而不是标准会话。这就去除了 Hibernate 采用的所有缓存和脏检查功能,而这些功能在批处理场景中可能会造成问题。有关无状态会话与普通 Hibernate 会话之间区别的更多信息,请参阅特定 Hibernate 版本的文档。HibernateCursorItemReader 可以让你声明一条 HQL 语句并传入一个 SessionFactory,它会以与 JdbcCursorItemReader 相同的基本方式,在每次调用读取时传回一个项目。以下示例配置使用了与 JDBC reader 相同的 'customer credit' 示例:

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

该配置的 ItemReader 返回 CustomerCredit 对象的方式与 JdbcCursorItemReader 所描述的方式完全相同,前提是已为 Customer 表正确创建了 hibernate 映射文件。使用无状态会话(useStatelessSession)属性的默认值为 true,但在此添加该属性是为了引起人们对开启或关闭该属性功能的注意。值得注意的是,底层游标的取值大小可以通过 setFetchSize 属性来设置。与 JdbcCursorItemReader 一样,配置也很简单。

下面的示例展示了如何在 XML 中注入 Hibernate ItemReader

XML Configuration
<bean id="itemReader"
      class="org.springframework.batch.item.database.HibernateCursorItemReader">
    <property name="sessionFactory" ref="sessionFactory" />
    <property name="queryString" value="from CustomerCredit" />
</bean>

下面的示例展示了如何在 Java 中注入 Hibernate ItemReader

Java Configuration
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
	return new HibernateCursorItemReaderBuilder<CustomerCredit>()
			.name("creditReader")
			.sessionFactory(sessionFactory)
			.queryString("from CustomerCredit")
			.build();
}
StoredProcedureItemReader

有时需要使用存储过程来获取游标数据。StoredProcedureItemReader 的工作原理与 JdbcCursorItemReader 类似,只不过它不是运行查询来获取游标,而是运行一个存储过程来返回游标。存储过程可以通过三种不同方式返回游标:

  • 作为返回的 ResultSet(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。

  • 作为输出参数返回的 ref-cursor(Oracle 和 PostgreSQL 使用)。

  • 作为存储函数调用的返回值。

下面的 XML 示例配置使用了与前面示例相同的 'customer credit' 示例:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

下面的 Java 配置示例使用了与前面示例相同的 'customer credit' 示例:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

前面的示例依赖于存储过程提供一个 ResultSet 作为返回结果(前面的选项 1)。

如果存储过程返回一个 ref-cursor(选项 2),那么我们就需要提供作为返回 ref-cursor 的 out 参数的位置。

下面的示例展示了如何使用 XML 中作为 ref-cursor 的第一个参数:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

下面的示例展示了如何使用 Java 中的 ref-cursor 作为第一个参数:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

如果游标是从存储函数中返回的(选项 3),我们需要将属性 "function" 设为 true。默认值为 false

下面的示例显示了 XML 中的 true 属性:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

下面的示例显示了 Java 中的 true 属性:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

在所有这些情况下,我们都需要定义 RowMapper 以及 DataSource 和实际存储过程名称。

如果存储过程或函数需要参数,则必须使用 parameters 属性声明和设置参数。下面这个 Oracle 示例声明了三个参数。第一个是 out 参数,用于返回 ref-cursor,第二个和第三个是 in 参数,用于获取 INTEGER 类型的值。

下面的示例展示了如何使用 XML 中的参数:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

下面的示例展示了如何在 Java 中使用参数:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

除了参数声明外,我们还需要指定一个 PreparedStatementSetter 实现来设置调用的参数值。这与上述 JdbcCursorItemReader 的工作原理相同。其他属性 中列出的所有附加属性也适用于 StoredProcedureItemReader

分页 ItemReader 的实现

使用数据库游标的另一种方法是运行多个查询,每个查询获取结果的一部分。我们将这一部分称为一个页。每个查询必须指定起始行号和我们希望在页中返回的行数。

JdbcPagingItemReader

JdbcPagingItemReader 是分页 ItemReader 的一种实现。JdbcPagingItemReader 需要一个 PagingQueryProvider,负责提供用于分页的 SQL 查询。由于每个数据库都有自己的分页支持策略,因此我们需要为每种支持的数据库类型使用不同的 PagingQueryProviderSqlPagingQueryProviderFactoryBean 也能自动检测正在使用的数据库,并确定合适的 PagingQueryProvider 实现。这简化了配置,是推荐的最佳做法。

SqlPagingQueryProviderFactoryBean 要求你指定一个 select 子句和一个 from 子句。你还可以提供一个可选的 where 子句。这些子句和所需的 sortKey 用于创建 SQL 语句。

必须在 sortKey 上设置唯一键约束,以确保数据不会在两次执行之间丢失。

reader 打开后,每次调用 read 都会传回一个 item,其基本方式与其他 ItemReader 相同。当需要额外的行时,就会在背后进行分页。

下面的 XML 示例配置使用了与前面所示基于游标的 ItemReaders 类似的 'customer credit' 示例:

XML Configuration
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

下面的 Java 配置示例使用的 'customer credit' 示例与前面显示的基于游标的 ItemReaders 类似:

Java Configuration
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

此配置的 ItemReader 使用必须指定的 RowMapper 返回 CustomerCredit 对象。'pageSize' 属性决定了每次查询运行时从数据库读取的实体数量。

parameterValues 属性可用于指定查询的参数值 Map 。如果在 where 子句中使用命名参数,则每个 entry 的 key 值应与命名参数的名称相匹配。如果使用传统的 '?' 占位符,则每个 entry 的 key 应为占位符的编号,从 1 开始。

JpaPagingItemReader

分页 ItemReader 的另一种实现是 JpaPagingItemReader。JPA 没有类似于 Hibernate StatelessSession 的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时,这是一个自然的选择。在读取每个分页后,实体都会被分离,持久化上下文(persistence context)也会被清除,以便在分页处理完毕后对实体进行垃圾回收。

JpaPagingItemReader 可让你声明一个 JPQL 语句并传入一个 EntityManagerFactory。然后,它将每次调用传回一个 item,以与其他 ItemReader 相同的基本方式进行读取。当需要额外的实体时,分页会在幕后进行。

下面的 XML 示例配置使用的 'customer credit' 示例与前面显示的 JDBC reader 相同:

XML Configuration
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

下面的 Java 示例配置使用了与前面所示 JDBC 阅读器相同的 'customer credit' 示例:

Java Configuration
@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

假设 CustomerCredit 对象具有正确的 JPA 注解或 ORM 映射文件,则此配置的 ItemReader 将以与上述 JdbcPagingItemReader 完全相同的方式返回 CustomerCredit 对象。pageSize 属性决定了每次执行查询时从数据库读取的实体数量。

数据库 ItemWriter

虽然平面文件和 XML 文件都有一个特定的 ItemWriter 实例,但在数据库领域却没有完全对应的实例。这是因为事务提供了所有需要的功能。对于文件来说,ItemWriter 实现是必要的,因为它们必须像事务一样,跟踪写入的 item,并在适当的时候刷新或清除。数据库则不需要这种功能,因为写入已经包含在事务中了。用户可以创建自己的 DAO 来实现 ItemWriter 接口,也可以使用为通用处理问题而编写的自定义 ItemWriter 中的 DAO。无论哪种方式,它们都能顺利运行。需要注意的一点是批量输出所提供的性能和错误处理功能。这在使用 hibernate 作为 ItemWriter 时最为常见,但在使用 JDBC 批处理模式时也会遇到同样的问题。批处理数据库输出没有任何固有缺陷,前提是我们要小心 Flush,而且数据中没有错误。但是,在写入过程中出现的任何错误都会造成混乱,因为我们无法知道是哪个单个 item 导致了异常,甚至无法知道是否有任何单个 item 造成了异常,如下图所示:

Flush 异常
Figure 4. Flush 异常

如果在写入前对 item 进行了缓冲,那么在提交前 flush 缓冲区时才会抛出错误。例如,假设每个分块写入 20 个item,第 15 个 item 抛出 DataIntegrityViolationException。就 Step 而言,所有 20 个 item 都已成功写入,因为在实际写入之前无法知道是否发生了错误。一旦调用 Session#flush(),缓冲区就会清空,异常也会发生。此时,Step 就无能为力了。必须回滚事务。通常情况下,该异常可能会导致 item 被跳过(取决于 skip/retry 策略),然后就不会再写入。但是,在分批处理的情况下,无法知道是哪个 item 导致了问题。异常发生时,整个缓冲区都在被写入。解决这个问题的唯一方法是在每个 item 后 flush,如下图所示:

写入异常
Figure 5. 写入异常

这是一种常见的用例,尤其是在使用 Hibernate 时,ItemWriter 实现的简单指南就是在每次调用 write() 时 flush。这样做可以可靠地跳过 item,Spring Batch 内部会处理出错后调用 ItemWriter 的粒度问题。

重用现有服务

批处理系统通常与其他应用方式结合使用。最常见的是在线系统,但它也可以通过移动每种应用方式使用的必要批量数据来支持集成甚至是“厚”客户端应用。因此,许多用户都希望在批处理作业中重用现有的 DAO 或其他服务。Spring 容器本身允许注入任何必要的类,从而使重用变得相当容易。不过,在某些情况下,现有服务可能需要充当 ItemReaderItemWriter,以满足对另一个 Spring Batch 类的依赖,或者因为它确实是某个 step 的主要 ItemReader。为每个需要包装的服务编写一个适配器类是相当琐碎的,但由于这是一个常见问题,Spring Batch 提供了相应的实现: ItemReaderAdapterItemWriterAdapter。这两个类都通过调用委托模式实现了标准的 Spring 方法,而且设置起来非常简单。

下面的 XML 示例使用了 ItemReaderAdapter

XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="generateFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

下面的 Java 示例使用了 ItemReaderAdapter

Java Configuration
@Bean
public ItemReaderAdapter itemReader() {
	ItemReaderAdapter reader = new ItemReaderAdapter();

	reader.setTargetObject(fooService());
	reader.setTargetMethod("generateFoo");

	return reader;
}

@Bean
public FooService fooService() {
	return new FooService();
}

需要注意的一点是,targetMethod 必须与 read 相同: 如果用尽,则返回 null。否则,它将返回一个 Object。否则,框架就无法知道处理何时应该结束,从而导致死循环或错误失败,这取决于 ItemWriter 的实现。

下面的 XML 示例使用了 ItemWriterAdapter

XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="processFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

下面的 Java 示例使用了 ItemWriterAdapter

Java Configuration
@Bean
public ItemWriterAdapter itemWriter() {
	ItemWriterAdapter writer = new ItemWriterAdapter();

	writer.setTargetObject(fooService());
	writer.setTargetMethod("processFoo");

	return writer;
}

@Bean
public FooService fooService() {
	return new FooService();
}

防止状态持久化

默认情况下,所有 ItemReaderItemWriter 实现都会在提交之前将其当前状态存储在 ExecutionContext 中。然而,这并不总是我们想要的行为。例如,许多开发人员选择使用进程指示器来使他们的 database reader "可重新运行"。输入数据中会添加一列额外的数据,以指示数据是否已被处理。当读取(或写入)某条记录时,已处理标志将由 false 变为 true。这样,SQL 语句就可以在 where 子句中包含一条额外的语句,如 where PROCESSED_IND = false,从而确保在重启时只返回未处理的记录。在这种情况下,最好不要存储任何状态(如当前行号),因为这些状态在重启时无关紧要。因此,所有 reader 和 writer 都包含 "saveState" 属性。

下面的 Bean 定义展示了如何在 XML 中防止状态持久化:

XML Configuration
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
    </property>
    <property name="saveState" value="false" />
    <property name="sql">
        <value>
            SELECT games.player_id, games.year_no, SUM(COMPLETES),
            SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
            SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
            SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
            from games, players where players.player_id =
            games.player_id group by games.player_id, games.year_no
        </value>
    </property>
</bean>

下面的 bean 定义展示了如何在 Java 中防止状态持久化:

Java Configuration
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
	return new JdbcCursorItemReaderBuilder<PlayerSummary>()
				.dataSource(dataSource)
				.rowMapper(new PlayerSummaryMapper())
				.saveState(false)
				.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
				  + "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
				  + "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
				  + "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
				  + "from games, players where players.player_id ="
				  + "games.player_id group by games.player_id, games.year_no")
				.build();

}

上述配置的 ItemReader 不会在 ExecutionContext 中为其参与的任何执行创建任何条目。

创建自定义的 ItemReader 和 ItemWriter

到目前为止,本章已经讨论了 Spring Batch 中读写的基本契约以及一些常见的实现方法。然而,这些都是相当通用的,有许多潜在的场景可能不在开箱即用的实现范围内。本节将通过一个简单的示例,说明如何创建自定义的 ItemReaderItemWriter 实现,并正确实现它们。ItemReader 还实现了 ItemStream,以说明如何使 reader 或 writer 可重新启动。

自定义 ItemReader 示例

在本示例中,我们将创建一个简单的 ItemReader 实现,用于从提供的 list 中读取数据。我们首先实现最基本的 ItemReader,即 read 方法,如下代码所示:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

上一个类接收一个 item list,每次返回一个 item,并从 list 中删除每个 item。当 list 为空时,它将返回 null,从而满足 ItemReader 的最基本要求,如下测试代码所示:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
ItemReader 可以重启

最后一个挑战是让 ItemReader 可以重新启动。目前,如果处理中断并重新开始,ItemReader 必须从头开始。这其实在很多情况下都是有效的,但有时批处理 job 最好从中断的地方重新开始。关键的判别因素通常是 reader 是有状态的还是无状态的。无状态 reader 不需要担心可重启性,但有状态 reader 必须在重启时尝试重建其最后的已知状态。因此,我们建议尽可能保持自定义 reader 的无状态状态,这样就不必担心重启问题。

如果确实需要存储状态,则应使用 ItemStream 接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用 ItemStream update 方法时,ItemReader 的当前索引都会以 'current.index' 为 key 存储在提供的 ExecutionContext 中。调用 ItemStream open 方法时,会检查 ExecutionContext 是否包含具有该 key 的条目。如果 key 被找到,当前索引就会被移动到该位置。这是一个相当琐碎的示例,但仍然符合一般的要求:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数 ItemReader 的重启逻辑要复杂得多。例如,JdbcCursorItemReader 会存储游标中最后处理行的行 ID。

还值得注意的是,ExecutionContext 中使用的 key 不应是琐碎的。这是因为一个 Step 中的所有 ItemStreams 都使用相同的 ExecutionContext。在大多数情况下,只需在 key 前加上类名就足以保证唯一性。但是,在极少数情况下,同一 step 中会使用两个相同类型的 ItemStream(如果需要输出两个文件,就会出现这种情况),这时就需要一个更唯一的名称。因此,许多 Spring Batch ItemReaderItemWriter 实现都有一个 setName() 属性,允许重写 key 名。

自定义 ItemWriter 示例

自定义 ItemWriter 的实现在很多方面与上述 ItemReader 示例相似,但又有很多不同之处,因此需要有自己的示例。不过,添加可重启性在本质上是相同的,因此本示例不再涉及。与 ItemReader 示例一样,为了使示例尽可能简单,使用了 List

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}
使 ItemWriter 可重启

要使 ItemWriter 可以重新启动,我们需要遵循与 ItemReader 相同的流程,添加并实现 ItemStream 接口,以同步执行上下文。在示例中,我们可能需要计算已处理 item 的数量,并将其添加为页脚记录。如果需要这样做,我们可以在 ItemWriter 中实现 ItemStream,这样在重新打开流时,计数器就会从执行上下文中重新生成。

在许多实际情况中,自定义 ItemWriter 还会委托给另一个 writer,而该 writer 本身是可重启的(例如,当写入文件时),或者写入事务性资源,因此不需要重启,因为它是无状态的。当你有一个有状态的 writer 时,你可能应该确保实现 ItemStream 以及 ItemWriter。还要记住,writer 的客户端需要知道 ItemStream,因此可能需要在配置中将其注册为 stream。

Item Reader 和 Writer 的实现

在本节中,我们将向你介绍前几节中尚未讨论过的 reader 和 writer。

装饰器

在某些情况下,用户需要将专门的行为附加到已有的 ItemReader 中。Spring Batch 提供了一些开箱即用的装饰器,可以为你的 ItemReaderItemWriter 实现添加额外的行为。

Spring Batch 包含以下装饰器:

SynchronizedItemStreamReader

当使用非线程安全的 ItemReader 时,Spring Batch 提供了 SynchronizedItemStreamReader 装饰器,可用于使 ItemReader 成为线程安全的。Spring Batch 提供了一个 SynchronizedItemStreamReaderBuilder,用于构建 SynchronizedItemStreamReader 的实例。

例如,FlatFileItemReader 不是线程安全的,不能在多线程 step 中使用。这种 reader 可以用 SynchronizedItemStreamReader 来装饰,以便在多线程 step 中安全使用。下面是一个如何装饰这种 reader 的示例:

@Bean
public SynchronizedItemStreamReader<Person> itemReader() {
	FlatFileItemReader<Person> flatFileItemReader = new FlatFileItemReaderBuilder<Person>()
			// set reader properties
			.build();

	return new SynchronizedItemStreamReaderBuilder<Person>()
			.delegate(flatFileItemReader)
			.build();
}
SingleItemPeekableItemReader

Spring Batch 包含一个装饰器,可为 ItemReader 添加一个 peek 方法。这方法可让用户提前“偷看”一个 item。重复调用该 peek 方法会返回相同的 item,这就是 read 方法返回的下一个 item。Spring Batch 提供了一个 SingleItemPeekableItemReaderBuilder,用于构建 SingleItemPeekableItemRea`der 的实例。

SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为不可能在多个线程中执行 peek。在下一次调用 read 时,只有一个 peek 过的线程会得到该 item。
SynchronizedItemStreamWriter

当使用非线程安全的 ItemWriter 时,Spring Batch 提供了 SynchronizedItemStreamWriter 装饰器,可用于使 ItemWriter 成为线程安全的。Spring Batch 提供了一个 SynchronizedItemStreamWriterBuilder,用于构建 SynchronizedItemStreamWriter 的实例。

例如,FlatFileItemWriter 不是线程安全的,不能在多线程 step 中使用。这种 writer 可以用 SynchronizedItemStreamWriter 来装饰,以便在多线程 step 中安全使用。下面是一个如何装饰这种 writer 的示例:

@Bean
public SynchronizedItemStreamWriter<Person> itemWriter() {
	FlatFileItemWriter<Person> flatFileItemWriter = new FlatFileItemWriterBuilder<Person>()
			// set writer properties
			.build();

	return new SynchronizedItemStreamWriterBuilder<Person>()
			.delegate(flatFileItemWriter)
			.build();
}
MultiResourceItemWriter

MultiResourceItemWriter 封装了一个 ResourceAwareItemWriterItemStream,并在当前资源中写入的 item 数超过 itemCountLimitPerResource 时创建一个新的输出资源。Spring Batch 提供了一个 MultiResourceItemWriterBuilder,用于构建 MultiResourceItemWriter 的实例。

ClassifierCompositeItemWriter

ClassifierCompositeItemWriter 会根据通过提供的 Classifier 实现的路由器模式,为每个 item 调用一系列 ItemWriter 实现中的一个。如果所有委托(delegate)都是线程安全的,那么该实现就是线程安全的。Spring Batch 提供了一个 ClassifierCompositeItemWriterBuilder,用于构建 ClassifierCompositeItemWriter 的实例。

ClassifierCompositeItemProcessor

ClassifierCompositeItemProcessor 是一个 ItemProcessor,它根据通过提供的 Classifier 实现的路由器模式,调用一系列 ItemProcessor 实现中的一个。Spring Batch 提供了一个 ClassifierCompositeItemProcessorBuilder,用于构建 ClassifierCompositeItemProcessor 的实例。

从MQ读取或写入

Spring Batch 为常用的MQ提供以下 reader 和 writer:

AmqpItemReader

AmqpItemReader 是使用 AmqpTemplate 从 exchange 接收或转换消息的 ItemReader。Spring Batch 提供了一个 AmqpItemReaderBuilder,用于构建 AmqpItemReader 的实例。

AmqpItemWriter

AmqpItemWriter 是一种使用 AmqpTemplate 向 AMQP exchange 发送消息的 ItemWriter。如果所提供的 AmqpTemplate 未指定名称,则消息会被发送到无名 exchange。Spring Batch 提供了一个 AmqpItemWriterBuilder,用于构建 AmqpItemWriter 的实例。

JmsItemReader

JmsItemReader 是使用 JmsTemplate 的 JMS ItemReader。该 template 应有一个默认 destination,用于为 read() 方法提供 item。Spring Batch 提供了一个 JmsItemReaderBuilder,用于构建 JmsItemReader 的实例。

JmsItemWriter

JmsItemWriter 是使用 JmsTemplate 的 JMS ItemWriter。该 template 应有一个默认 destination,用于在 write(List) 中发送 item。Spring Batch 提供了一个 JmsItemWriterBuilder,用于构建 JmsItemWriter 的实例。

KafkaItemReader

KafkaItemReader 是 Apache Kafka topic 的 ItemReader。它可以配置为从同一 topic 的多个分区读取消息。它在执行上下文中存储消息偏移量,以支持重启功能。Spring Batch 提供了一个 KafkaItemReaderBuilder,用于构建 KafkaItemReader 的实例。

KafkaItemWriter

KafkaItemWriter 是 Apache Kafka 的 ItemWriter,它使用 KafkaTemplate 向默认主题发送事件。Spring Batch 提供了一个 KafkaItemWriterBuilder 来构建 KafkaItemWriter 的实例。

数据库 Reader

Spring Batch 提供以下数据库 reader:

Neo4jItemReader

Neo4jItemReader 是一个 ItemReader,它使用分页技术从图数据库 Neo4j 中读取对象。Spring Batch 提供了一个 Neo4jItemReaderBuilder,用于构建 Neo4jItemReader 的实例。

MongoItemReader

MongoItemReader 是一种通过分页技术从 MongoDB 读取文档的 ItemReader。Spring Batch 提供了一个 MongoItemReaderBuilder,用于构建 MongoItemReader 的实例。

HibernateCursorItemReader

HibernateCursorItemReader 是在 Hibernate 基础上构建的用于读取数据库记录的 ItemStreamReader。它执行 HQL 查询,然后在初始化后,在调用 read() 方法时遍历结果集,连续返回与当前行对应的对象。Spring Batch 提供了一个 HibernateCursorItemReaderBuilder,用于构建 HibernateCursorItemReader 的实例。

HibernatePagingItemReader

HibernatePagingItemReader 是一种 ItemReader,用于读取建立在 Hibernate 基础上的数据库记录,一次最多只能读取固定数量的 item。Spring Batch 提供了一个 HibernatePagingItemReaderBuilder,用于构建 HibernatePagingItemReader 的实例。

RepositoryItemReader

RepositoryItemReader 是通过使用 PagingAndSortingRepository 来读取记录的 ItemReader。Spring Batch 提供了一个 RepositoryItemReaderBuilder,用于构建 RepositoryItemReader 的实例。

数据库 Writer

Spring Batch 提供以下数据库 Write:

Neo4jItemWriter

Neo4jItemWriter 是向 Neo4j 数据库写入数据的 ItemWriter 实现。Spring Batch 提供了一个 Neo4jItemWriterBuilder,用于构建 Neo4jItemWriter 实例。

MongoItemWriter

MongoItemWriter 是一个 ItemWriter 实现,它使用 Spring Data 的 MongoOperations 实现写入 MongoDB 存储。Spring Batch 提供了一个 MongoItemWriterBuilder,用于构建 MongoItemWriter 的实例。

RepositoryItemWriter

RepositoryItemWriter 是 Spring Data CrudRepositoryItemWriter wrapper。Spring Batch 提供了一个 RepositoryItemWriterBuilder,用于构建 RepositoryItemWriter 的实例。

HibernateItemWriter

HibernateItemWriter 是一种 ItemWriter,它使用 Hibernate session 来保存或更新不属于当前 Hibernate session 的实体。Spring Batch 提供了一个 HibernateItemWriterBuilder,用于构建 HibernateItemWriter 的实例。

JdbcBatchItemWriter

JdbcBatchItemWriter 是一个 ItemWriter,它使用 NamedParameterJdbcTemplate 的批处理功能,为所有提供的项目执行一批语句。Spring Batch 提供了一个 JdbcBatchItemWriterBuilder,用于构建 JdbcBatchItemWriter 的实例。

JpaItemWriter

JpaItemWriter 是一个 ItemWriter,它使用 JPA EntityManagerFactory 来合并不属于 persistence context 的任何实体。Spring Batch 提供了一个 JpaItemWriterBuilder,用于构建 JpaItemWriter 的实例。

专用的 Reader

Spring Batch 提供以下专用 reader:

LdifReader

LdifReader 可从 Resource 中读取 LDIF(LDAP 数据交换格式)记录,对其进行解析,并为每次 read 返回一个 LdapAttribute 对象。Spring Batch 提供了一个 LdifReaderBuilder,用于构建 LdifReader 的实例。

MappingLdifReader

MappingLdifReaderResource 中读取 LDIF(LDAP 数据交换格式)记录,对其进行解析,然后将每条 LDIF 记录映射到 POJO(Plain Old Java 对象)。每次读取都会返回一个 POJO。Spring Batch 提供了一个 MappingLdifReaderBuilder,用于构建 MappingLdifReader 的实例。

AvroItemReader

AvroItemReader 可从 Resource 中读取序列化的 Avro 数据。每次读取都会返回一个由 Java 类或 Avro Schema 指定类型的实例。reader 可选择是否为嵌入 Avro schema 的输入进行配置。Spring Batch 提供了一个 AvroItemReaderBuilder,用于构建 AvroItemReader 的实例。

专用的 Writer

Spring Batch 提供以下专用的 Writer

SimpleMailMessageItemWriter

SimpleMailMessageItemWriter 是一个可以发送邮件信息的 ItemWriter。它将实际发送邮件的任务委托给 MailSender 实例。Spring Batch 提供了一个 SimpleMailMessageItemWriterBuilder,用于构建 SimpleMailMessageItemWriter 的实例。

AvroItemWriter

AvroItemWrite 根据给定的类型或 Schema 将 Java 对象序列化为可写资源。可选择是否将 writer 配置为在输出中嵌入 Avro schema。Spring Batch 提供了一个 AvroItemWriterBuilder,用于构建 AvroItemWriter 的实例。

专门的 Processor

Spring Batch 提供以下专门的 Processor:

ScriptItemProcessor

ScriptItemProcessor 是一个项目处理器(ItemProcessor),它将当前要处理的 item 传递给所提供的脚本,脚本的结果由处理器返回。Spring Batch 提供了一个 ScriptItemProcessorBuilder,用于构建 ScriptItemProcessor 的实例。