ItemReader 和 ItemWriter
本站( springdoc.cn )中的内容来源于 spring.io ,原始版权归属于 spring.io。由 springdoc.cn 进行翻译,整理。可供个人学习、研究,未经许可,不得进行任何转载、商用或与之相关的行为。 商标声明:Spring 是 Pivotal Software, Inc. 在美国以及其他国家的商标。 |
所有批处理的最简单形式就是读入大量数据,执行某种类型的计算或转换,然后将结果写出。Spring Batch 提供了三个关键接口来帮助执行批量读写: ItemReader
、ItemProcessor
和 ItemWriter
。
ItemReader
尽管只是一个简单的概念,但 ItemReader
却能从多种不同类型的输入中提供数据。最常见的例子包括
-
平面文件:Flat-file item reader 从平面文件中读取数据行,该文件通常描述由文件中固定位置定义的数据字段或由某些特殊字符(如逗号)分隔的记录。
-
XML: XML
ItemReader
可独立于用于解析、映射和验证对象的技术处理 XML。输入数据可根据 XSD schema 验证 XML 文件。 -
Database: 访问数据库资源可返回结果集,这些结果集可映射到对象进行处理。默认的 SQL
ItemReader
实现会调用一个RowMapper
来返回对象、跟踪当前行(如果需要重启)、存储基本统计信息,并提供一些事务增强功能(稍后解释)。
还有更多可能性,但本章将重点讨论基本的可能性。附录 A 列出了所有可用的 ItemReader
实现的完整列表。
ItemReader
是通用输入操作的基本接口,如下接口定义所示:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
read
方法定义了 ItemReader
最基本的约定。调用该方法将返回一个 item,如果没有其他 item,则返回 null
值。item 可能代表文件中的一行、数据库中的一行或 XML 文件中的一个元素。一般来说,我们希望这些 item 能被映射到一个可用的 domain 对象(如 Trade
、Foo
或其他),但约定中并没有这样的要求。
预计 ItemReader
接口的实现只能是前向的。但是,如果底层资源是事务性的(如 JMS 队列),那么在回滚情况下,调用 read
可能会在后续调用中返回相同的逻辑项。还值得注意的是,ItemReader
缺乏可处理的 item 不会导致异常抛出。例如,数据库 ItemReader
配置的查询结果为 0,则在第一次调用 read
时返回 null
值。
ItemWriter
ItemWriter
的功能与 ItemReader
相似,但操作相反。资源仍然需要定位、打开和关闭,但它们的不同之处在于,ItemWriter
会写入,而不是读入。对于数据库或队列,这些操作可能是插入、更新或发送。输出的序列化格式取决于每个 batch job。
与 ItemReader
一样,ItemWriter
也是一个相当通用的接口,如下接口定义所示:
public interface ItemWriter<T> {
void write(Chunk<? extends T> items) throws Exception;
}
与 ItemReader
上的 read
一样,write
提供了 ItemWriter
的基本约定。只要打开,它就会尝试写出传入的 item 列表。由于通常情况下,item 会被 "批量" 写入一个大块,然后输出,因此该接口接受的是一个 item 列表,而不是一个 item 本身。在写出列表后,可以在从 write 方法返回之前执行任何必要的刷出。例如,如果向 Hibernate DAO 写入数据,可以多次调用 write
方法,每个 item 调用一次。然后,writer 可以在返回之前在 Hibernate session 上调用 flush
。
ItemStream
ItemReaders
和 ItemWriters
都能很好地实现各自的目的,但它们之间有一个共同的问题,需要另一个接口。一般来说,作 为batch job 范围的一部分,reader 和 writer 需要打开、关闭,并需要一种机制来持久化状态。如下例所示,ItemStream
接口就可以实现这一目的:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
在介绍每种方法之前,我们都要提到 ExecutionContext
。同时实现 ItemStream
的 ItemReader
客户端应在调用 read
之前调用 open
,以便打开任何资源(如文件)或获取连接。类似的限制也适用于实现 ItemStream
的 ItemWriter
。正如第 2 章所述,如果在 ExecutionContext
中发现了预期数据,则可将其用于在初始状态以外的位置启动 ItemReader
或 ItemWriter
。相反,调用 close
方法是为了确保安全释放打开过程中分配的任何资源。调用 update
方法主要是为了确保将当前持有的任何状态加载到所提供的 ExecutionContext
中。该方法在提交前被调用,以确保当前状态在提交前被持久化到数据库中。
在 ItemStream
的客户端是 Step
(来自 Spring Batch Core)的特殊情况下,会为每个 StepExecution
创建 ExecutionContext
,以便用户存储特定执行的状态,并期望在再次启动相同 JobInstance
时返回该状态。对于熟悉 Quartz 的人来说,其语义与 Quartz JobDataMap
非常相似。
委托模式和 Step 注册
请注意,CompositeItemWriter
是委托模式的一个示例,这在 Spring Batch 中很常见。委托本身可能实现回调接口,如 StepListener
。如果它们实现了回调接口,并且作为 Job
中 Step
的一部分与 Spring Batch Core 结合使用,那么它们几乎肯定需要与 Step
一起手动注册。直接连接到 Step
的 reader、writer 或处理器(processor)如果实现了 ItemStream
或 StepListener
接口,则会自动注册。但是,由于 Step
不知道委托,因此需要将它们注入为 listener 或 stream(或两者,如果合适)。
下面的示例展示了如何在 XML 中把委托注入为 Stream:
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
commit-interval="2">
<streams>
<stream ref="barWriter" />
</streams>
</chunk>
</tasklet>
</step>
</job>
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter" />
</bean>
<bean id="barWriter" class="...BarWriter" />
下面的示例展示了如何在 Java 中把委托注入为 Stream:
@Bean
public Job ioSampleJob(JobRepository jobRepository) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(compositeItemWriter())
.stream(barWriter())
.build();
}
@Bean
public CustomCompositeItemWriter compositeItemWriter() {
CustomCompositeItemWriter writer = new CustomCompositeItemWriter();
writer.setDelegate(barWriter());
return writer;
}
@Bean
public BarWriter barWriter() {
return new BarWriter();
}
平面文件
交换批量数据最常用的机制之一一直是平面文件。与 XML 不同的是,XML 有一个约定俗成的标准来定义其结构(XSD),而阅读平面文件的任何人都必须提前了解该文件的确切结构。一般来说,所有平面文件都分为两种类型:分隔型和固定长度型。分隔文件是指字段由逗号等分隔符分隔的文件。固定长度文件的字段长度是固定的。
FieldSet
在 Spring Batch 中处理平面文件时,无论是输入还是输出,最重要的类之一就是 FieldSet
。许多架构和库都包含帮助你从文件中读取数据的抽象,但它们通常返回一个 String
或 String
对象数组。这实际上只能帮你做到一半。FieldSet
是 Spring Batch 从文件资源中绑定字段的抽象。它允许开发人员像处理数据库输入一样处理文件输入。FieldSet
在概念上类似于 JDBC ResultSet
。FieldSet
只需要一个参数:一个 token String
数组。你还可以选择配置字段名称,这样就可以通过索引或名称访问字段,就像下面示例中的 ResultSet
一样:
String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
FieldSet
接口上还有更多选项,如 Date
、long
、BigDecimal
等。FieldSet
的最大优势在于,它能对平面文件输入进行一致的解析。无论是在处理格式异常导致的错误时,还是在进行简单的数据转换时,FieldSet
都能保持一致,而不是每个批处理任务都以潜在的意外方式进行不同的解析。
FlatFileItemReader
平面文件是指最多包含二维(表格)数据的任何类型的文件。在 Spring Batch 框架中,FlatFileItemReader
类为读取和解析平面文件提供了基本功能。FlatFileItemReader
的两个最重要的必需依赖项是 Resource
和 LineMapper
。LineMapper
接口将在接下来的章节中详细介绍。resource 属性表示 Spring Core Resource
。有关如何创建此类 Bean 的文档,请参阅 Spring Framework 第 5 章 - Resources。因此,除了展示以下简单示例外,本指南将不再详述创建 Resource
对象的细节:
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的批处理环境中,目录结构通常由企业应用集成(EAI)基础架构管理,其中为外部接口建立了下拉区,以便将文件从 FTP 位置移动到批处理位置,反之亦然。文件移动程序超出了 Spring Batch 架构的范围,但批处理作业流中包含文件移动程序作为作业流中的步骤并不罕见。批处理架构只需要知道如何定位要处理的文件。Spring Batch 从这个起点开始将数据送入管道。然而,Spring Integration 提供了许多此类服务。
FlatFileItemReader
中的其他属性可以让你进一步指定数据的解释方式,如下表所述:
属性 | 类型 | 说明 |
---|---|---|
comments |
String[] |
指定用于表示注释行的行前缀。 |
encoding |
String |
指定要使用的文本编码。默认值为 |
lineMapper |
|
将 |
linesToSkip |
int |
文件顶部忽略的行数。 |
recordSeparatorPolicy |
RecordSeparatorPolicy |
用于确定行结束符的位置,并在引号字符串内的行结束符上执行续行等操作。 |
resource |
|
读取的资源。 |
skippedLinesCallback |
LineCallbackHandler |
用于传递文件中要跳过的行的原始行内容的接口。如果 |
strict |
boolean |
在严格模式下,如果输入资源不存在,reader 会在 |
LineMapper
正如 RowMapper
采用 ResultSet
等底层结构并返回 Object
一样,平面文件处理也需要相同的结构来将 String
行转换为 Object
,如下接口定义所示:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
其基本约定是,在给定当前行及其关联行号的情况下,mapper 应返回一个结果 domain 对象。这与 RowMapper
类似,因为每一行都与其行号相关联,就像 ResultSet
中的每一行都与其行号相关联一样。这样,行号就可以与生成的 domain 对象绑定,以便进行 id 比较或记录更多信息。然而,与 RowMapper
不同的是,LineMapper
给出的是原始行,如上文所述,这只能实现一半的目标。必须将该行标记化为一个 FieldSet
,然后将其映射到一个对象(如本文档稍后所述)。
LineTokenizer
将一行输入转化为 FieldSet
的抽象是必要的,因为可能有多种格式的平面文件数据需要转换为 FieldSet
。在 Spring Batch 中,这个接口就是 LineTokenizer
:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
LineTokenizer
的约定是这样的:给定一行输入(理论上,String
可以包含多行),返回一个代表该行的 FieldSet
。然后,该 FieldSet
可以传递给 FieldSetMapper
。Spring Batch 包含以下 LineTokenizer
实现:
-
DelimitedLineTokenizer
:用于用分隔符分隔记录字段的文件。最常用的分隔符是逗号,但也经常使用管道或分号。 -
FixedLengthTokenizer
:用于记录中每个字段都是 "固定宽度" 的文件。必须为每种记录类型定义每个字段的宽度。 -
PatternMatchingCompositeLineTokenizer
:通过检查模式,确定应在特定行上使用 tokenizer 列表中的哪个LineTokenizer
。
FieldSetMapper
FieldSetMapper
接口定义了一个方法 mapFieldSet
,该方法接收一个 FieldSet
对象并将其内容映射到一个对象。该对象可以是自定义 DTO、domain 对象或数组,具体取决于 job 的需要。FieldSetMapper
与 LineTokenizer
结合使用,可将资源中的一行数据转换为所需类型的对象,如下接口定义所示:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
使用的 pattern 与 JdbcTemplate
使用的 RowMapper
相同。
DefaultLineMapper
既然已经定义了读取平面文件的基本接口,那么显然需要三个基本步骤:
-
从文件中读取一行。
-
将字符串行传入
LineTokenizer#tokenize()
方法,以获取FieldSet
。 -
将 tokenizer 返回的
FieldSet
传递给FieldSetMapper
,返回ItemReader#read()
方法的结果。
上述两个接口分别代表两项不同的任务:将行转换为 FieldSet
和将 FieldSet
映射到 domain 对象。由于 LineTokenizer
的输入与 LineMapper
的输入(一行)相匹配,而 FieldSetMapper
的输出与 LineMapper
的输出相匹配,因此我们提供了一个同时使用 LineTokenizer
和 FieldSetMapper
的默认实现。DefaultLineMapper
如下面的类定义所示,代表了大多数用户需要的行为:
public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
上述功能是在默认实现中提供的,而不是内置于 reader 本身(如框架以前的版本),以便用户在控制解析过程时有更大的灵活性,尤其是在需要访问原始行的情况下。
简单的分隔文件读取示例
下面的示例说明了如何通过实际 domain 场景读取平面文件。该批处理任务从以下文件读入足球运动员:
ID,lastName,firstName,position,birthYear,debutYear "AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996", "AbduRa00,Abdullah,Rabih,rb,1975,1999", "AberWa00,Abercrombie,Walter,rb,1959,1982", "AbraDa00,Abramowicz,Danny,wr,1945,1967", "AdamBo00,Adams,Bob,te,1946,1969", "AdamCh00,Adams,Charlie,wr,1979,2003"
该文件的内容映射到以下 Player
domain 对象:
public class Player implements Serializable {
private String ID;
private String lastName;
private String firstName;
private String position;
private int birthYear;
private int debutYear;
public String toString() {
return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
",First Name=" + firstName + ",Position=" + position +
",Birth Year=" + birthYear + ",DebutYear=" +
debutYear;
}
// setters and getters...
}
要将 FieldSet
映射到 Player
对象,需要定义一个返回 player 的 FieldSetMapper
,如下例所示:
protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) {
Player player = new Player();
player.setID(fieldSet.readString(0));
player.setLastName(fieldSet.readString(1));
player.setFirstName(fieldSet.readString(2));
player.setPosition(fieldSet.readString(3));
player.setBirthYear(fieldSet.readInt(4));
player.setDebutYear(fieldSet.readInt(5));
return player;
}
}
然后,可以通过正确构造一个 FlatFileItemReader
并调用 read
来读取文件,如下例所示:
FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
//DelimitedLineTokenizer defaults to comma as its delimiter
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();
每次调用 read
方法都会从文件中的每一行返回一个新的 Player
对象。当读取到文件末尾时,将返回 null
值。
按名称映射字段
DelimitedLineTokenizer
和 FixedLengthTokenizer
都允许一个额外的功能,其功能类似于 JDBC ResultSet
。字段名称可以注入到任何一种 LineTokenizer
实现中,以增加映射函数的可读性。首先,将平面文件中所有字段的列名注入 tokenizer,如下例所示:
tokenizer.setNames(new String[] {"ID", "lastName", "firstName", "position", "birthYear", "debutYear"});
FieldSetMapper
可以如下使用这些信息:
public class PlayerMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fs) {
if (fs == null) {
return null;
}
Player player = new Player();
player.setID(fs.readString("ID"));
player.setLastName(fs.readString("lastName"));
player.setFirstName(fs.readString("firstName"));
player.setPosition(fs.readString("position"));
player.setDebutYear(fs.readInt("debutYear"));
player.setBirthYear(fs.readInt("birthYear"));
return player;
}
}
将字段集自动映射到 Domain 对象
对许多人来说,编写特定的 FieldSetMapper
就像为 JdbcTemplate
编写特定的 RowMapper
一样麻烦。Spring Batch 提供了一个 FieldSetMapper
,通过使用 JavaBean 规范将字段名与对象上的 setter 进行匹配,从而自动映射字段,从而简化了这一工作。
还是以足球为例,BeanWrapperFieldSetMapper
配置的 XML 代码段如下:
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="player" />
</bean>
<bean id="player"
class="org.springframework.batch.sample.domain.Player"
scope="prototype" />
还是以足球为例,BeanWrapperFieldSetMapper
配置的 Java 代码段如下:
@Bean
public FieldSetMapper fieldSetMapper() {
BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();
fieldSetMapper.setPrototypeBeanName("player");
return fieldSetMapper;
}
@Bean
@Scope("prototype")
public Player player() {
return new Player();
}
对于 FieldSet
中的每个条目,mapper 都会在 Player
对象的新实例上查找相应的 setter(因此需要 prototype scope),就像 Spring 容器查找与属性名称匹配的 setter 一样。对 FieldSet
中的每个可用字段进行映射,然后返回结果 Player
对象,无需代码。
固定长度文件格式
到目前为止,我们只详细讨论了分隔文件。然而,它们只代表了文件读取的一半。许多使用平面文件的组织都使用固定长度格式。下面是一个固定长度文件的示例:
UK21341EAH4121131.11customer1 UK21341EAH4221232.11customer2 UK21341EAH4321333.11customer3 UK21341EAH4421434.11customer4 UK21341EAH4521535.11customer5
虽然这看起来像一个大字段,但实际上代表了 4 个不同的字段:
-
ISIN: 订单商品的唯一标识符 - 12 个字符。
-
Quantity: 订单商品的编号 - 3 个字符。
-
Price: 商品价格 - 5 个字符。
-
Customer: 订单商品的客户 ID - 9 个字符。
在配置 FixedLengthLineTokenizer
时,必须以范围的形式提供这些长度。
下面的示例展示了如何用 XML 为 FixedLengthLineTokenizer
标记符定义范围:
<bean id="fixedLengthLineTokenizer"
class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
<property name="names" value="ISIN,Quantity,Price,Customer" />
<property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>
由于 FixedLengthLineTokenizer
使用了与前面讨论的 LineTokenizer
相同的接口,因此它返回的 FieldSet
与使用了分隔符的 FieldSet
相同。这就允许使用相同的方法来处理其输出,如使用 BeanWrapperFieldSetMapper
。
支持上述范围语法需要在 |
下面的示例展示了如何用 Java 为 FixedLengthLineTokenizer
定义范围:
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
tokenizer.setColumns(new Range(1, 12),
new Range(13, 15),
new Range(16, 20),
new Range(21, 29));
return tokenizer;
}
由于 FixedLengthLineTokenizer
使用了与上文讨论的 LineTokenizer
相同的接口,因此它返回的 FieldSet
与使用了分隔符的 FieldSet
相同。这样,在处理其输出时就可以使用相同的方法,例如使用 BeanWrapperFieldSetMapper
。
单个文件中的多种记录类型
到此为止,所有的文件读取示例都为了简单起见做了一个关键假设:文件中的所有记录都具有相同的格式。然而,情况并非总是如此。很常见的情况是,文件中的记录可能具有不同的格式,需要以不同的方式标记并映射到不同的对象。下面的文件摘录就说明了这一点:
USER;Smith;Peter;;T;20014539;F LINEA;1044391041ABC037.49G201XX1383.12H LINEB;2134776319DEF422.99M005LI
在这个文件中,我们有三种类型的记录:"USER"、"LINEA" 和 "LINEB"。 "USER" 行对应一个 User
对象。"LINEA" 和 "LINEB" 都对应 Line
对象,但 "LINEA" 比 "LINEB" 包含更多信息。
ItemReader
会单独读取每一行,但我们必须指定不同的 LineTokenizer
和 FieldSetMapper
对象,以便 ItemWriter
接收到正确的项目。PatternMatchingCompositeLineMapper
允许将 pattern 映射到 LineTokenizer
和 pattern 映射到 FieldSetMapper
,从而简化了这一过程。
下面的示例展示了如何用 XML 为 FixedLengthLineTokenizer
定义范围:
<bean id="orderFileLineMapper"
class="org.spr...PatternMatchingCompositeLineMapper">
<property name="tokenizers">
<map>
<entry key="USER*" value-ref="userTokenizer" />
<entry key="LINEA*" value-ref="lineATokenizer" />
<entry key="LINEB*" value-ref="lineBTokenizer" />
</map>
</property>
<property name="fieldSetMappers">
<map>
<entry key="USER*" value-ref="userFieldSetMapper" />
<entry key="LINE*" value-ref="lineFieldSetMapper" />
</map>
</property>
</bean>
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
PatternMatchingCompositeLineMapper lineMapper =
new PatternMatchingCompositeLineMapper();
Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
tokenizers.put("USER*", userTokenizer());
tokenizers.put("LINEA*", lineATokenizer());
tokenizers.put("LINEB*", lineBTokenizer());
lineMapper.setTokenizers(tokenizers);
Map<String, FieldSetMapper> mappers = new HashMap<>(2);
mappers.put("USER*", userFieldSetMapper());
mappers.put("LINE*", lineFieldSetMapper());
lineMapper.setFieldSetMappers(mappers);
return lineMapper;
}
在本例中,"LINEA" 和 "LINEB" 分别拥有不同的 LineTokenizer
实例,但它们都使用相同的 FieldSetMapper
。
PatternMatchingCompositeLineMapper
使用 PatternMatcher#match
方法为每一行选择正确的委托。PatternMatcher
允许使用两个具有特殊含义的通配符:问号("?")匹配的是一个字符,而星号("*")匹配的是零个或多个字符。请注意,在前面的配置中,所有模式都以星号结尾,因此它们实际上是行的前缀。无论配置顺序如何,PatternMatcher
总是尽可能匹配最特殊的模式。因此,如果 "LINE*" 和 "LINEA*" 都被列为模式,"LINEA" 将匹配模式 "LINEA*",而 "LINEB" 将匹配模式 "LINE*"。此外,单个星号("*")可以作为默认值,匹配任何其他模式不匹配的行。
下面的示例展示了如何匹配 XML 中任何其他模式都不匹配的行:
<entry key="*" value-ref="defaultLineTokenizer" />
下面的示例展示了如何在 Java 中匹配其他模式无法匹配的行:
...
tokenizers.put("*", defaultLineTokenizer());
...
还有一个 PatternMatchingCompositeLineTokenizer
可单独用于 tokenizer。
平面文件中包含跨多行记录的情况也很常见。要处理这种情况,需要采用更复杂的策略。在 multiLineRecords
示例中可以找到这种常见模式的演示。
平面文件中的异常处理
在很多情况下,tokenizer 一行可能会导致异常抛出。许多平面文件并不完美,包含格式不正确的记录。许多用户选择跳过这些错误行,同时记录问题、原始行和行号。这些日志随后可以手动或通过其他批处理 job 进行检查。因此,Spring Batch 提供了一个处理解析异常的异常层次结构: FlatFileParseException
和 FlatFileFormatException
。FlatFileItemReader
在尝试读取文件时遇到任何错误,就会抛出 FlatFileParseException
。FlatFileFormatException
由 LineTokenizer
接口的实现抛出,表示在 tokenizer 过程中遇到了更具体的错误。
IncorrectTokenCountException
DelimitedLineTokenizer
和 FixedLengthLineTokenizer
都能指定用于创建 FieldSet
的列名。但是,如果列名数量与标记行时发现的列名数量不匹配,则无法创建 FieldSet
,并会抛出 IncorrectTokenCountException
(不正确标记数异常),其中包含遇到的标记数和预期的标记数,如下例所示:
tokenizer.setNames(new String[] {"A", "B", "C", "D"});
try {
tokenizer.tokenize("a,b,c");
}
catch (IncorrectTokenCountException e) {
assertEquals(4, e.getExpectedCount());
assertEquals(3, e.getActualCount());
}
由于 tokenizer 配置了 4 个列名,但在文件中只找到 3 个 token,因此出现了不正确 token 计数异常(IncorrectTokenCountException
)。
IncorrectLineLengthException
固定长度格式的文件在解析时有额外的要求,因为与分隔格式不同,每一列都必须严格遵守预定义的宽度。如果行的总长度不等于该列的最宽值,就会出现异常,如下例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5),
new Range(6, 10),
new Range(11, 15) });
try {
tokenizer.tokenize("12345");
fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
assertEquals(15, ex.getExpectedLength());
assertEquals(5, ex.getActualLength());
}
上述 tokenizer 的配置范围是 1-5、6-10 和 11-15。因此,行的总长度为 15。然而,在前面的示例中,传入了长度为 5 的行,导致抛出了一个不正确行长异常(IncorrectLineLengthException
)。在这里抛出异常,而不是只映射第一列,可以使对该行的处理更早失败,并且比在 FieldSetMapper
中尝试读入第 2 列时失败所包含的信息更多。不过,在有些情况下,行的长度并不总是恒定的。因此,可以通过 'strict' 属性关闭行长度验证,如下例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));
前面的示例与前面的示例几乎完全相同,只是调用了 tokenizer.setStrict(false)
。该设置告诉标 tokenizer 在标记行时不强制执行行长。现在,FieldSet
已正确创建并返回。不过,它只包含剩余值的空标记(token)。
FlatFileItemWriter
向平面文件写入与从文件读入有同样的问题和必须克服的困难。一个 step 必须能够以事务方式写入分隔格式或固定长度格式。
LineAggregator
正如 LineTokenizer
接口需要将一个 item 转换成 String
一样,文件写入也必须有一种方法将多个字段聚合成一个字符串,以便写入文件。在 Spring Batch 中,这就是 LineAggregator
,如下接口定义所示:
public interface LineAggregator<T> {
public String aggregate(T item);
}
LineAggregator
与 LineTokenizer
在逻辑上正好相反。LineTokenizer
接收一个 String
并返回一个 FieldSet
,而 LineAggregator
接收一个 item
并返回一个 String
。
PassThroughLineAggregator
LineAggregator
接口最基本的实现是 PassThroughLineAggregator
,它假定对象已经是 string,或者其 string 表示可以接受写入,如下代码所示:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
如果需要直接控制字符串的创建,但又需要 FlatFileItemWriter
的优势(如事务和重启支持),前述实现就非常有用。
简单文件的写入示例
既然已经定义了 LineAggregator
接口及其最基本的实现,即 PassThroughLineAggregator
,那么就可以解释写入的基本流程了:
-
要写入的 object 被传递给
LineAggregator
,以获得一个String
。 -
返回的
String
将写入配置文件。
下面摘录的 FlatFileItemWriter
代码表达了这一点:
public void write(T item) throws Exception {
write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
在 XML 中,一个简单的配置示例可能如下所示:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean class="org.spr...PassThroughLineAggregator"/>
</property>
</bean>
在 Java 中,一个简单的配置示例可能如下所示:
@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<Foo>()
.name("itemWriter")
.resource(new FileSystemResource("target/test-outputs/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
FieldExtractor
前面的示例可能对写入文件的最基本用途有用。但是,FlatFileItemWriter
的大多数用户都有一个需要写出的 domain 对象,因此,必须将其转换成一行。在文件读取中,需要如下操作:
-
从文件中读取一行。
-
将该行传递给
LineTokenizer#tokenize()
方法,以获取一个FieldSet
。 -
P将 tokenizer 返回的
FieldSet
传递给FieldSetMapper
,返回ItemReader#read()
方法的结果。
文件写入的步骤与此类似,但正好相反:
-
将要写入的 item 传递给 writer。
-
将 item 上的字段转换为数组。
-
将得到的数组汇总成一行。
由于框架无法知道需要写出对象中的哪些字段,因此必须编写一个 FieldExtractor
来完成将 item 转化为数组的任务,如下面的接口定义所示:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
FieldExtractor
接口的实现应从所提供对象的字段中创建一个数组,然后可以在元素之间使用分隔符或作为固定宽度行的一部分写出。
PassThroughFieldExtractor
需要写出数组、Collection
或 FieldSet
的情况很多。从这些集合类型中 "提取" 数组非常简单。为此,需要将集合转换为数组。因此,在这种情况下应使用 PassThroughFieldExtractor
。需要注意的是,如果传入的对象不是集合类型,那么 PassThroughFieldExtractor
返回的数组只包含要提取的项目。
BeanWrapperFieldExtractor
与文件读取部分描述的 BeanWrapperFieldSetMapper
一样,通常最好配置如何将 domain 对象转换为对象数组,而不是自己编写转换过程。BeanWrapperFieldExtractor
提供了这种功能,如下例所示:
BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);
该 extractor 的实现只有一个必需属性:要映射的字段名。正如 BeanWrapperFieldSetMapper
需要字段名称来将 FieldSet
上的字段映射到所提供对象上的 setter 一样,BeanWrapperFieldExtractor
也需要字段名称来映射到 getter 以创建对象数组。值得注意的是,名称的顺序决定了数组中字段的顺序。
带分隔符文件写入示例
最基本的平面文件格式是所有字段都用分隔符分隔。这可以使用 DelimitedLineAggregator
来实现。下面的示例写出了一个简单的 domain 对象,表示客户账户的贷记:
public class CustomerCredit {
private int id;
private String name;
private BigDecimal credit;
//getters and setters removed for clarity
}
由于使用的是 domain 对象,因此必须提供 FieldExtractor
接口的实现以及要使用的分隔符。
下面的示例展示了如何在 XML 中使用带有分隔符的 FieldExtractor
:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit"/>
</bean>
</property>
</bean>
</property>
</bean>
下面的示例展示了如何在 Java 中使用带有分隔符的 FieldExtractor
:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
在上一个示例中,本章前面介绍的 BeanWrapperFieldExtractor
用于将 CustomerCredit
中的 name 和 credit 字段转化为对象数组,然后在每个字段之间用逗号隔开。
也可以使用 FlatFileItemWriterBuilder.DelimitedBuilder
自动创建 BeanWrapperFieldExtractor
和 DelimitedLineAggregator
,如下例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.delimited()
.delimiter("|")
.names(new String[] {"name", "credit"})
.build();
}
固定宽度文件写入示例
分隔不是唯一的平面文件格式。许多人喜欢为每一列设定宽度,以便在字段之间划线,这通常被称为 "固定宽度"。Spring Batch 通过 FormatterLineAggregator
支持这种文件写入方式。
使用上述相同的 CustomerCredit
domain 对象,可以用 XML 进行如下配置:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...FormatterLineAggregator">
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit" />
</bean>
</property>
<property name="format" value="%-9s%-2.0f" />
</bean>
</property>
</bean>
使用上述相同的 CustomerCredit
domain 对象,可以在 Java 中进行如下配置:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
前面示例中的大部分内容都很熟悉。不过,format
属性的值是新的。
下面的示例显示了 XML 中的 format 属性:
<property name="format" value="%-9s%-2.0f" />
下面的示例展示了 Java 中的 format 属性:
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...
底层实现是使用作为 Java 5 一部分添加的 Formatter
构建的。Java Formatter
基于 C 语言的 printf
功能。有关如何配置 formatter 的详细信息,请参阅 Formatter 的 Javadoc。
也可以使用 FlatFileItemWriterBuilder.FormattedBuilder
自动创建 BeanWrapperFieldExtractor
和 FormatterLineAggregator
,如下例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.formatted()
.format("%-9s%-2.0f")
.names(new String[] {"name", "credit"})
.build();
}
处理文件的创建
FlatFileItemReader
与文件资源的关系非常简单。当 reader 初始化时,它会打开文件(如果存在的话),如果打不开则会抛出异常。文件写入就没那么简单了。乍一看,FlatFileItemWriter
似乎也应该有类似的直接约束: 如果文件已经存在,则抛出异常;如果不存在,则创建文件并开始写入。但是,重新启动 Job
可能会导致问题。在正常重启情况下,约束是相反的:如果文件存在,则从最后已知的良好位置开始写入,如果不存在,则抛出异常。但是,如果该任务的文件名总是相同,会发生什么情况呢?在这种情况下,如果文件存在,除非是重启,否则就应该删除。考虑到这种可能性,FlatFileItemWriter
包含了 shouldDeleteIfExists
属性。将此属性设置为 true
会导致在打开 writer 时删除同名的现有文件。
XML Item Reader/Writer
Spring Batch 为读取 XML 记录并将其映射到 Java 对象以及将 Java 对象写入 XML 记录提供了事务处理基础架构。
Constraints on streaming XML
StAX API 用于 I/O,因为其他标准 XML 解析 API 不符合批量处理要求(DOM 一次将整个输入加载到内存中,而 SAX 只允许用户提供回调来控制解析过程)。 |
我们需要考虑在 Spring Batch 中 XML 输入和输出是如何工作的。首先,有几个概念与文件读写不同,但在 Spring Batch 的 XML 处理中是通用的。在 XML 处理中,我们认为 XML 资源是与单个记录相对应的 "片段" 集合,而不是需要 tokenize 的记录行(FieldSet
实例),如下图所示:
在上述情况中,'trade' 标签被定义为 "根元素"。介于 '<trade>' 和 '</trade>' 之间的所有内容都被视为一个 "片段"。Spring Batch 使用对象/XML 映射 (OXM) 将片段绑定到对象。不过,Spring Batch 并不依赖于任何特定的 XML 绑定技术。典型的用法是委托 Spring OXM,它为最流行的 OXM 技术提供了统一的抽象。对 Spring OXM 的依赖是可选的,如果需要,你可以选择实现 Spring Batch 的特定接口。与 OXM 所支持技术的关系如下图所示:
在介绍了 OXM 以及如何使用 XML 片段来表示记录之后,我们现在可以更仔细地研究 reader 和 writer 了。
StaxEventItemReader
StaxEventItemReader
配置为处理来自 XML 输入流的记录提供了一个典型设置。首先,考虑一下 StaxEventItemReader
可以处理的以下 XML 记录集:
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
要处理 XML 记录,需要具备以下条件:
-
根元素名称: 构成要映射对象的片段根元素名称。示例配置中的值为 trade。
-
Resource:表示要读取文件的 Spring 资源。
-
Unmarshaller
: Spring OXM 提供的一种解码工具,用于将 XML 片段映射到对象。
下面的示例展示了如何定义一个 StaxEventItemReader
,该 reader 可与名为 trade
的根元素、data/iosample/input/input.xml
的资源以及 XML 中名为 tradeMarshaller
的 unmarshaller 协同工作:
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
<property name="fragmentRootElementName" value="trade" />
<property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>
下面的示例展示了如何定义一个 StaxEventItemReader
,该 reader 可与名为 trade
的根元素、data/iosample/input/input.xml
的资源和名为 tradeMarshaller
的 unmarshaller
一起使用:
@Bean
public StaxEventItemReader itemReader() {
return new StaxEventItemReaderBuilder<Trade>()
.name("itemReader")
.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
.addFragmentRootElements("trade")
.unmarshaller(tradeMarshaller())
.build();
}
请注意,在本例中,我们选择使用 XStreamMarshaller
,它接受以 map 形式传入的别名,第一个 key 和 value 是片段名称(即根元素)和要绑定的对象类型。然后,与 FieldSet
类似,映射到对象类型中字段的其他元素的名称在映射表中描述为 key/value 对。在配置文件中,我们可以使用 Spring 配置工具来描述所需的别名。
下面的示例展示了如何用 XML 描述别名:
<bean id="tradeMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
下面的示例展示了如何用 Java 来描述别名:
@Bean
public XStreamMarshaller tradeMarshaller() {
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
return marshaller;
}
在输入时,reader 会读取 XML 资源,直到识别到一个新片段即将开始。默认情况下,reader 通过匹配元素名称来识别即将开始的新片段。reader 从片段中创建独立的 XML 文档,并将文档传递给反序列化器(通常是 Spring OXM Unmarshaller
的 wrapper),以便将 XML 映射到 Java 对象。
总之,该过程类似于以下使用 Spring 配置提供的注入的 Java 代码:
StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = true;
Trade trade = null;
while (hasNext) {
trade = xmlStaxEventItemReader.read();
if (trade == null) {
hasNext = false;
}
else {
System.out.println(trade);
}
}
StaxEventItemWriter
输出与输入类似。StaxEventItemWriter
需要一个 Resource
、一个 marshaller
和一个 rootTagName
。Java 对象会传递给一个 marshaller(通常是标准的 Spring OXM Marshaller),Marshaller 会使用自定义事件 writer 将其写入 Resource
,自定义事件 writer 会过滤 OXM 工具为每个片段生成的 StartDocument
和 EndDocument
事件。
下面的 XML 示例使用了 MarshallingEventWriterSerializer
:
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" ref="outputResource" />
<property name="marshaller" ref="tradeMarshaller" />
<property name="rootTagName" value="trade" />
<property name="overwriteOutput" value="true" />
</bean>
下面的 Java 示例使用了 MarshallingEventWriterSerializer
:
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
return new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(tradeMarshaller())
.resource(outputResource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
}
前面的配置设置了三个所需的属性,并设置了本章前面提到的可选属性 overwriteOutput=true
,用于指定是否可以覆盖现有文件。
下面的 XML 示例与本章前面的读取示例中使用的 marshaller 相同:
<bean id="customerCreditMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="customer"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
下面的 Java 示例与本章前面的读取示例中使用的 marshaller 相同:
@Bean
public XStreamMarshaller customerCreditMarshaller() {
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
marshaller.setAliases(aliases);
return marshaller;
}
下面用一个 Java 示例来概括所有讨论的要点,演示如何通过编程设置所需的属性:
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
StaxEventItemWriter staxItemWriter =
new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(marshaller)
.resource(resource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
staxItemWriter.afterPropertiesSet();
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);
读写 JSON Item
Spring Batch 支持读写以下格式的 JSON 资源:
[
{
"isin": "123",
"quantity": 1,
"price": 1.2,
"customer": "foo"
},
{
"isin": "456",
"quantity": 2,
"price": 1.4,
"customer": "bar"
}
]
假定 JSON 资源是与单个项目相对应的 JSON 对象数组。Spring Batch 与任何特定的 JSON 库无关。
JsonItemReader
JsonItemReader
将 JSON 解析和绑定委托给 org.springframework.batch.item.json.JsonObjectReader
接口的实现。该接口旨在通过使用流式 API 以分块方式读取 JSON 对象来实现。目前提供了两种实现:
要处理 JSON 记录,需要具备以下条件:
-
Resource
: 表示要读取的 JSON 文件的 Spring Resource。 -
JsonObjectReader
: JSON object reader,用于解析 JSON 对象并将其与 item 绑定
下面的示例展示了如何定义一个可与之前的 JSON 资源 org/springframework/batch/item/json/trades.json
协同工作的 JsonItemReader
和一个基于 Jackson 的 JsonObjectReader
:
@Bean
public JsonItemReader<Trade> jsonItemReader() {
return new JsonItemReaderBuilder<Trade>()
.jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonItemReader")
.build();
}
JsonFileItemWriter
JsonFileItemWriter
委托 org.springframework.batch.item.json.JsonObjectMarshaller
接口对项目进行编译。该接口接收一个对象并将其序列化为一个 JSON String
。目前提供两种实现:
要写入 JSON 记录,需要具备以下条件:
-
Resource
:表示要写入的 JSON 文件的 SpringResource
。 -
JsonObjectMarshaller
:一个 JSON 对象marshaller ,用于将对象序列化为 JSON 格式。
下面的示例展示了如何定义 JsonFileItemWriter
:
@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
return new JsonFileItemWriterBuilder<Trade>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonFileItemWriter")
.build();
}
多文件输入
在一个 Step
中处理多个文件是常见的要求。假设所有文件格式相同,MultiResourceItemReader
支持这种类型的 XML 和平面文件处理输入。请考虑目录中的以下文件:
file-1.txt file-2.txt ignored.txt
file-1.txt 和 file-2.txt 格式相同,出于业务原因,应一起处理。MultiResourceItemReader
可通过使用通配符读取这两个文件。
下面的示例展示了如何读取 XML 中带有通配符的文件:
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
下面的示例展示了如何在 Java 中使用通配符读取文件:
@Bean
public MultiResourceItemReader multiResourceReader() {
return new MultiResourceItemReaderBuilder<Foo>()
.delegate(flatFileItemReader())
.resources(resources())
.build();
}
引用的委托(delegate)是一个简单的 FlatFileItemReader
。上述配置从两个文件中读取输入,处理回滚和重启情况。需要注意的是,与任何 ItemReader
一样,添加额外输入(本例中为文件)可能会在重启时造成潜在问题。建议批处理任务在成功完成前,使用各自的单独目录。
通过使用 MultiResourceItemReader#setComparator(Comparator) 对输入资源进行排序,以确保在重启场景中 job 运行之间保留资源顺序。
|
数据库
与大多数企业应用程序一样,数据库是批处理的核心存储机制。然而,批处理与其他应用方式不同,因为系统必须处理的数据集非常大。如果一条 SQL 语句返回 100 万行,那么结果集可能会将所有返回结果保存在内存中,直到读取完所有行。Spring Batch 针对这一问题提供了两种解决方案:
基于游标的 ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法,因为这是数据库对 "流式" 关系数据问题的解决方案。Java ResultSet
类本质上是一种操作游标的面向对象机制。ResultSet
维护着一个指向当前数据行的游标。在 ResultSet
上调用 next
会将光标移动到下一行。基于 Spring Batch 游标的 ItemReader
实现会在初始化时打开一个游标,并在每次调用 read
时将游标向前移动一行,返回一个可用于处理的映射对象。然后调用 close
方法确保释放所有资源。Spring core JdbcTemplate
通过使用回调模式来完全映射 ResultSet
中的所有行,并在将控制权返回给方法调用者之前关闭,从而解决了这个问题。但是,在批处理中,这必须等到 step 完成。下图显示了基于游标的 ItemReader
的工作原理图。请注意,虽然示例使用的是 SQL(因为 SQL 广为人知),但任何技术都可以实现基本方法
本例说明了基本模式。给定一个 "FOO" 表,其中有三列:ID
、NAME
和 BAR
: 选择 ID 大于 1 但小于 7 的所有行,这样游标(第 1 行)的起点就位于 ID 2 上。这一行的结果应该是一个完全映射的 Foo
对象。再次调用 read()
会将光标移到下一行,即 ID 为 3 的 Foo
。 每次读取后,都会写出这些读取结果,以便对这些对象进行垃圾回收(假设没有实例变量保持对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标技术的 JDBC 实现。它可直接与 ResultSet
一起使用,并要求针对从 DataSource
获取的连接运行 SQL 语句。以下面数据库 schema 为例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人喜欢为每一行使用一个 domain 对象,因此下面的示例使用 RowMapper
接口的实现来映射 CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
由于 JdbcCursorItemReader
与 JdbcTemplate
共享关键接口,因此我们有必要举例说明如何使用 JdbcTemplate
读取这些数据,以便与 ItemReader
进行对比。在本示例中,假设 CUSTOMER
数据库中有 1000 条记录。第一个示例使用 JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码段后,customerCredits
列表中包含 1,000 个 CustomerCredit
对象。在 query 方法中,从 DataSource
获取连接,针对连接运行所提供的 SQL,并针对结果集中的每一行调用 mapRow
方法。这与下面示例中的 JdbcCursorItemReader
方法形成了鲜明对比:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码段后,counter 等于 1,000。如果上面的代码将返回的 customerCredit
放入一个 list,结果将与 JdbcTemplate
示例完全相同。然而,ItemReader
的最大优势在于它允许对项目进行 "流式" 处理。只需调用一次 read
方法,ItemWriter
就能写出项目,然后通过 read
获取下一个项目。这样,项目读写就可以分 "块" 完成,并定期提交,这正是高性能批处理的精髓所在。此外,它还可以轻松配置,以便注入 Spring Batch Step
。
下面的示例展示了如何将 ItemReader
注入 XML 中的一个 Step
:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的示例展示了如何在 Java Step 中注入 ItemReader
:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
其他属性
由于在 Java 中打开游标有许多不同的选项,因此可以在 JdbcCursorItemReader
上设置许多属性,如下表所示:
ignoreWarnings |
决定是否记录 |
fetchSize |
为 JDBC 驱动程序提供提示,说明当 |
maxRows |
设置底层`ResultSet` 在同一时间可容纳的最大行数限制。 |
queryTimeout |
设置驱动程序等待 |
verifyCursorPosition |
由于 |
saveState |
表示 reader 的状态是否应保存在 |
driverSupportsAbsolute |
表示 JDBC 驱动程序是否支持在 |
setUseSharedExtendedConnection |
指示游标使用的连接是否应被所有其他处理使用,从而共享同一个事务。如果设置 |
HibernateCursorItemReader
就像普通 Spring 用户在决定是否使用 ORM 解决方案时会影响他们是否使用 JdbcTemplate
或 HibernateTemplate
一样,Spring Batch 用户也有同样的选择。HibernateCursorItemReader
是游标技术的 Hibernate 实现。在批处理中使用 Hibernate 一直颇具争议。这主要是因为 Hibernate 最初是为支持在线应用程序风格而开发的。但这并不意味着它不能用于批处理。解决这个问题的最简单方法是使用无状态会话(StatelessSession
)而不是标准会话。这就去除了 Hibernate 采用的所有缓存和脏检查功能,而这些功能在批处理场景中可能会造成问题。有关无状态会话与普通 Hibernate 会话之间区别的更多信息,请参阅特定 Hibernate 版本的文档。HibernateCursorItemReader
可以让你声明一条 HQL 语句并传入一个 SessionFactory
,它会以与 JdbcCursorItemReader
相同的基本方式,在每次调用读取时传回一个项目。以下示例配置使用了与 JDBC reader 相同的 'customer credit' 示例:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
该配置的 ItemReader
返回 CustomerCredit
对象的方式与 JdbcCursorItemReader
所描述的方式完全相同,前提是已为 Customer
表正确创建了 hibernate 映射文件。使用无状态会话(useStatelessSession
)属性的默认值为 true
,但在此添加该属性是为了引起人们对开启或关闭该属性功能的注意。值得注意的是,底层游标的取值大小可以通过 setFetchSize
属性来设置。与 JdbcCursorItemReader
一样,配置也很简单。
下面的示例展示了如何在 XML 中注入 Hibernate ItemReader
:
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
下面的示例展示了如何在 Java 中注入 Hibernate ItemReader
:
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
StoredProcedureItemReader
有时需要使用存储过程来获取游标数据。StoredProcedureItemReader
的工作原理与 JdbcCursorItemReader
类似,只不过它不是运行查询来获取游标,而是运行一个存储过程来返回游标。存储过程可以通过三种不同方式返回游标:
-
作为返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为输出参数返回的 ref-cursor(Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
下面的 XML 示例配置使用了与前面示例相同的 'customer credit' 示例:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的 Java 配置示例使用了与前面示例相同的 'customer credit' 示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
前面的示例依赖于存储过程提供一个 ResultSet
作为返回结果(前面的选项 1)。
如果存储过程返回一个 ref-cursor
(选项 2),那么我们就需要提供作为返回 ref-cursor
的 out 参数的位置。
下面的示例展示了如何使用 XML 中作为 ref-cursor 的第一个参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的示例展示了如何使用 Java 中的 ref-cursor 作为第一个参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
如果游标是从存储函数中返回的(选项 3),我们需要将属性 "function" 设为 true
。默认值为 false
。
下面的示例显示了 XML 中的 true
属性:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的示例显示了 Java 中的 true
属性:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
在所有这些情况下,我们都需要定义 RowMapper
以及 DataSource
和实际存储过程名称。
如果存储过程或函数需要参数,则必须使用 parameters
属性声明和设置参数。下面这个 Oracle 示例声明了三个参数。第一个是 out
参数,用于返回 ref-cursor,第二个和第三个是 in
参数,用于获取 INTEGER
类型的值。
下面的示例展示了如何使用 XML 中的参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
下面的示例展示了如何在 Java 中使用参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
除了参数声明外,我们还需要指定一个 PreparedStatementSetter
实现来设置调用的参数值。这与上述 JdbcCursorItemReader
的工作原理相同。其他属性 中列出的所有附加属性也适用于 StoredProcedureItemReader
。
分页 ItemReader
的实现
使用数据库游标的另一种方法是运行多个查询,每个查询获取结果的一部分。我们将这一部分称为一个页。每个查询必须指定起始行号和我们希望在页中返回的行数。
JdbcPagingItemReader
JdbcPagingItemReader
是分页 ItemReader
的一种实现。JdbcPagingItemReader
需要一个 PagingQueryProvider
,负责提供用于分页的 SQL 查询。由于每个数据库都有自己的分页支持策略,因此我们需要为每种支持的数据库类型使用不同的 PagingQueryProvider
。SqlPagingQueryProviderFactoryBean
也能自动检测正在使用的数据库,并确定合适的 PagingQueryProvider
实现。这简化了配置,是推荐的最佳做法。
SqlPagingQueryProviderFactoryBean
要求你指定一个 select
子句和一个 from
子句。你还可以提供一个可选的 where
子句。这些子句和所需的 sortKey
用于创建 SQL 语句。
必须在 sortKey 上设置唯一键约束,以确保数据不会在两次执行之间丢失。
|
reader 打开后,每次调用 read
都会传回一个 item,其基本方式与其他 ItemReader
相同。当需要额外的行时,就会在背后进行分页。
下面的 XML 示例配置使用了与前面所示基于游标的 ItemReaders
类似的 'customer credit' 示例:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
下面的 Java 配置示例使用的 'customer credit' 示例与前面显示的基于游标的 ItemReaders
类似:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
此配置的 ItemReader
使用必须指定的 RowMapper
返回 CustomerCredit
对象。'pageSize' 属性决定了每次查询运行时从数据库读取的实体数量。
parameterValues
属性可用于指定查询的参数值 Map
。如果在 where
子句中使用命名参数,则每个 entry 的 key 值应与命名参数的名称相匹配。如果使用传统的 '?' 占位符,则每个 entry 的 key 应为占位符的编号,从 1 开始。
JpaPagingItemReader
分页 ItemReader
的另一种实现是 JpaPagingItemReader
。JPA 没有类似于 Hibernate StatelessSession
的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时,这是一个自然的选择。在读取每个分页后,实体都会被分离,持久化上下文(persistence context)也会被清除,以便在分页处理完毕后对实体进行垃圾回收。
JpaPagingItemReader
可让你声明一个 JPQL 语句并传入一个 EntityManagerFactory
。然后,它将每次调用传回一个 item,以与其他 ItemReader
相同的基本方式进行读取。当需要额外的实体时,分页会在幕后进行。
下面的 XML 示例配置使用的 'customer credit' 示例与前面显示的 JDBC reader 相同:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
下面的 Java 示例配置使用了与前面所示 JDBC 阅读器相同的 'customer credit' 示例:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
假设 CustomerCredit
对象具有正确的 JPA 注解或 ORM 映射文件,则此配置的 ItemReader
将以与上述 JdbcPagingItemReader
完全相同的方式返回 CustomerCredit
对象。pageSize
属性决定了每次执行查询时从数据库读取的实体数量。
数据库 ItemWriter
虽然平面文件和 XML 文件都有一个特定的 ItemWriter
实例,但在数据库领域却没有完全对应的实例。这是因为事务提供了所有需要的功能。对于文件来说,ItemWriter
实现是必要的,因为它们必须像事务一样,跟踪写入的 item,并在适当的时候刷新或清除。数据库则不需要这种功能,因为写入已经包含在事务中了。用户可以创建自己的 DAO 来实现 ItemWriter
接口,也可以使用为通用处理问题而编写的自定义 ItemWriter
中的 DAO。无论哪种方式,它们都能顺利运行。需要注意的一点是批量输出所提供的性能和错误处理功能。这在使用 hibernate 作为 ItemWriter
时最为常见,但在使用 JDBC 批处理模式时也会遇到同样的问题。批处理数据库输出没有任何固有缺陷,前提是我们要小心 Flush,而且数据中没有错误。但是,在写入过程中出现的任何错误都会造成混乱,因为我们无法知道是哪个单个 item 导致了异常,甚至无法知道是否有任何单个 item 造成了异常,如下图所示:
如果在写入前对 item 进行了缓冲,那么在提交前 flush 缓冲区时才会抛出错误。例如,假设每个分块写入 20 个item,第 15 个 item 抛出 DataIntegrityViolationException
。就 Step
而言,所有 20 个 item 都已成功写入,因为在实际写入之前无法知道是否发生了错误。一旦调用 Session#flush()
,缓冲区就会清空,异常也会发生。此时,Step
就无能为力了。必须回滚事务。通常情况下,该异常可能会导致 item 被跳过(取决于 skip/retry 策略),然后就不会再写入。但是,在分批处理的情况下,无法知道是哪个 item 导致了问题。异常发生时,整个缓冲区都在被写入。解决这个问题的唯一方法是在每个 item 后 flush,如下图所示:
这是一种常见的用例,尤其是在使用 Hibernate 时,ItemWriter
实现的简单指南就是在每次调用 write()
时 flush。这样做可以可靠地跳过 item,Spring Batch 内部会处理出错后调用 ItemWriter
的粒度问题。
重用现有服务
批处理系统通常与其他应用方式结合使用。最常见的是在线系统,但它也可以通过移动每种应用方式使用的必要批量数据来支持集成甚至是“厚”客户端应用。因此,许多用户都希望在批处理作业中重用现有的 DAO 或其他服务。Spring 容器本身允许注入任何必要的类,从而使重用变得相当容易。不过,在某些情况下,现有服务可能需要充当 ItemReader
或 ItemWriter
,以满足对另一个 Spring Batch 类的依赖,或者因为它确实是某个 step 的主要 ItemReader
。为每个需要包装的服务编写一个适配器类是相当琐碎的,但由于这是一个常见问题,Spring Batch 提供了相应的实现: ItemReaderAdapter
和 ItemWriterAdapter
。这两个类都通过调用委托模式实现了标准的 Spring 方法,而且设置起来非常简单。
下面的 XML 示例使用了 ItemReaderAdapter
:
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
下面的 Java 示例使用了 ItemReaderAdapter
:
@Bean
public ItemReaderAdapter itemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(fooService());
reader.setTargetMethod("generateFoo");
return reader;
}
@Bean
public FooService fooService() {
return new FooService();
}
需要注意的一点是,targetMethod
必须与 read
相同: 如果用尽,则返回 null
。否则,它将返回一个 Object
。否则,框架就无法知道处理何时应该结束,从而导致死循环或错误失败,这取决于 ItemWriter
的实现。
下面的 XML 示例使用了 ItemWriterAdapter
:
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
下面的 Java 示例使用了 ItemWriterAdapter
:
@Bean
public ItemWriterAdapter itemWriter() {
ItemWriterAdapter writer = new ItemWriterAdapter();
writer.setTargetObject(fooService());
writer.setTargetMethod("processFoo");
return writer;
}
@Bean
public FooService fooService() {
return new FooService();
}
防止状态持久化
默认情况下,所有 ItemReader
和 ItemWriter
实现都会在提交之前将其当前状态存储在 ExecutionContext
中。然而,这并不总是我们想要的行为。例如,许多开发人员选择使用进程指示器来使他们的 database reader "可重新运行"。输入数据中会添加一列额外的数据,以指示数据是否已被处理。当读取(或写入)某条记录时,已处理标志将由 false
变为 true
。这样,SQL 语句就可以在 where
子句中包含一条额外的语句,如 where PROCESSED_IND = false
,从而确保在重启时只返回未处理的记录。在这种情况下,最好不要存储任何状态(如当前行号),因为这些状态在重启时无关紧要。因此,所有 reader 和 writer 都包含 "saveState" 属性。
下面的 Bean 定义展示了如何在 XML 中防止状态持久化:
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
</property>
<property name="saveState" value="false" />
<property name="sql">
<value>
SELECT games.player_id, games.year_no, SUM(COMPLETES),
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
from games, players where players.player_id =
games.player_id group by games.player_id, games.year_no
</value>
</property>
</bean>
下面的 bean 定义展示了如何在 Java 中防止状态持久化:
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<PlayerSummary>()
.dataSource(dataSource)
.rowMapper(new PlayerSummaryMapper())
.saveState(false)
.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
+ "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
+ "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
+ "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
+ "from games, players where players.player_id ="
+ "games.player_id group by games.player_id, games.year_no")
.build();
}
上述配置的 ItemReader
不会在 ExecutionContext
中为其参与的任何执行创建任何条目。
创建自定义的 ItemReader 和 ItemWriter
到目前为止,本章已经讨论了 Spring Batch 中读写的基本契约以及一些常见的实现方法。然而,这些都是相当通用的,有许多潜在的场景可能不在开箱即用的实现范围内。本节将通过一个简单的示例,说明如何创建自定义的 ItemReader
和 ItemWriter
实现,并正确实现它们。ItemReader
还实现了 ItemStream
,以说明如何使 reader 或 writer 可重新启动。
自定义 ItemReader
示例
在本示例中,我们将创建一个简单的 ItemReader
实现,用于从提供的 list 中读取数据。我们首先实现最基本的 ItemReader
,即 read
方法,如下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
上一个类接收一个 item list,每次返回一个 item,并从 list 中删除每个 item。当 list 为空时,它将返回 null
,从而满足 ItemReader
的最基本要求,如下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
让 ItemReader
可以重启
最后一个挑战是让 ItemReader
可以重新启动。目前,如果处理中断并重新开始,ItemReader
必须从头开始。这其实在很多情况下都是有效的,但有时批处理 job 最好从中断的地方重新开始。关键的判别因素通常是 reader 是有状态的还是无状态的。无状态 reader 不需要担心可重启性,但有状态 reader 必须在重启时尝试重建其最后的已知状态。因此,我们建议尽可能保持自定义 reader 的无状态状态,这样就不必担心重启问题。
如果确实需要存储状态,则应使用 ItemStream
接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用 ItemStream
update
方法时,ItemReader
的当前索引都会以 'current.index' 为 key 存储在提供的 ExecutionContext
中。调用 ItemStream
open
方法时,会检查 ExecutionContext
是否包含具有该 key 的条目。如果 key 被找到,当前索引就会被移动到该位置。这是一个相当琐碎的示例,但仍然符合一般的要求:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数 ItemReader
的重启逻辑要复杂得多。例如,JdbcCursorItemReader
会存储游标中最后处理行的行 ID。
还值得注意的是,ExecutionContext
中使用的 key 不应是琐碎的。这是因为一个 Step
中的所有 ItemStreams
都使用相同的 ExecutionContext
。在大多数情况下,只需在 key 前加上类名就足以保证唯一性。但是,在极少数情况下,同一 step 中会使用两个相同类型的 ItemStream
(如果需要输出两个文件,就会出现这种情况),这时就需要一个更唯一的名称。因此,许多 Spring Batch ItemReader
和 ItemWriter
实现都有一个 setName()
属性,允许重写 key 名。
自定义 ItemWriter
示例
自定义 ItemWriter
的实现在很多方面与上述 ItemReader
示例相似,但又有很多不同之处,因此需要有自己的示例。不过,添加可重启性在本质上是相同的,因此本示例不再涉及。与 ItemReader
示例一样,为了使示例尽可能简单,使用了 List
:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使 ItemWriter
可重启
要使 ItemWriter
可以重新启动,我们需要遵循与 ItemReader
相同的流程,添加并实现 ItemStream
接口,以同步执行上下文。在示例中,我们可能需要计算已处理 item 的数量,并将其添加为页脚记录。如果需要这样做,我们可以在 ItemWriter
中实现 ItemStream
,这样在重新打开流时,计数器就会从执行上下文中重新生成。
在许多实际情况中,自定义 ItemWriter
还会委托给另一个 writer,而该 writer 本身是可重启的(例如,当写入文件时),或者写入事务性资源,因此不需要重启,因为它是无状态的。当你有一个有状态的 writer 时,你可能应该确保实现 ItemStream
以及 ItemWriter
。还要记住,writer 的客户端需要知道 ItemStream
,因此可能需要在配置中将其注册为 stream。
Item Reader 和 Writer 的实现
在本节中,我们将向你介绍前几节中尚未讨论过的 reader 和 writer。
装饰器
在某些情况下,用户需要将专门的行为附加到已有的 ItemReader
中。Spring Batch 提供了一些开箱即用的装饰器,可以为你的 ItemReader
和 ItemWriter
实现添加额外的行为。
Spring Batch 包含以下装饰器:
SynchronizedItemStreamReader
当使用非线程安全的 ItemReader
时,Spring Batch 提供了 SynchronizedItemStreamReader
装饰器,可用于使 ItemReader
成为线程安全的。Spring Batch 提供了一个 SynchronizedItemStreamReaderBuilder
,用于构建 SynchronizedItemStreamReader
的实例。
例如,FlatFileItemReader
不是线程安全的,不能在多线程 step 中使用。这种 reader 可以用 SynchronizedItemStreamReader
来装饰,以便在多线程 step 中安全使用。下面是一个如何装饰这种 reader 的示例:
@Bean
public SynchronizedItemStreamReader<Person> itemReader() {
FlatFileItemReader<Person> flatFileItemReader = new FlatFileItemReaderBuilder<Person>()
// set reader properties
.build();
return new SynchronizedItemStreamReaderBuilder<Person>()
.delegate(flatFileItemReader)
.build();
}
SingleItemPeekableItemReader
Spring Batch 包含一个装饰器,可为 ItemReader
添加一个 peek
方法。这方法可让用户提前“偷看”一个 item。重复调用该 peek
方法会返回相同的 item,这就是 read
方法返回的下一个 item。Spring Batch 提供了一个 SingleItemPeekableItemReaderBuilder
,用于构建 SingleItemPeekableItemRea`der 的实例。
SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为不可能在多个线程中执行 peek。在下一次调用 read 时,只有一个 peek 过的线程会得到该 item。
|
SynchronizedItemStreamWriter
当使用非线程安全的 ItemWriter
时,Spring Batch 提供了 SynchronizedItemStreamWriter
装饰器,可用于使 ItemWriter
成为线程安全的。Spring Batch 提供了一个 SynchronizedItemStreamWriterBuilder
,用于构建 SynchronizedItemStreamWriter
的实例。
例如,FlatFileItemWriter
不是线程安全的,不能在多线程 step 中使用。这种 writer 可以用 SynchronizedItemStreamWriter 来装饰,以便在多线程 step 中安全使用。下面是一个如何装饰这种 writer 的示例:
@Bean
public SynchronizedItemStreamWriter<Person> itemWriter() {
FlatFileItemWriter<Person> flatFileItemWriter = new FlatFileItemWriterBuilder<Person>()
// set writer properties
.build();
return new SynchronizedItemStreamWriterBuilder<Person>()
.delegate(flatFileItemWriter)
.build();
}
MultiResourceItemWriter
MultiResourceItemWriter
封装了一个 ResourceAwareItemWriterItemStream
,并在当前资源中写入的 item 数超过 itemCountLimitPerResource
时创建一个新的输出资源。Spring Batch 提供了一个 MultiResourceItemWriterBuilder
,用于构建 MultiResourceItemWriter
的实例。
从MQ读取或写入
Spring Batch 为常用的MQ提供以下 reader 和 writer:
AmqpItemReader
AmqpItemReader
是使用 AmqpTemplate
从 exchange 接收或转换消息的 ItemReader
。Spring Batch 提供了一个 AmqpItemReaderBuilder
,用于构建 AmqpItemReader
的实例。
AmqpItemWriter
AmqpItemWriter
是一种使用 AmqpTemplate
向 AMQP exchange 发送消息的 ItemWriter
。如果所提供的 AmqpTemplate
未指定名称,则消息会被发送到无名 exchange。Spring Batch 提供了一个 AmqpItemWriterBuilder
,用于构建 AmqpItemWriter
的实例。
JmsItemReader
JmsItemReader
是使用 JmsTemplate
的 JMS ItemReader
。该 template 应有一个默认 destination,用于为 read()
方法提供 item。Spring Batch 提供了一个 JmsItemReaderBuilder
,用于构建 JmsItemReader
的实例。
JmsItemWriter
JmsItemWriter
是使用 JmsTemplate
的 JMS ItemWriter
。该 template 应有一个默认 destination,用于在 write(List)
中发送 item。Spring Batch 提供了一个 JmsItemWriterBuilder
,用于构建 JmsItemWriter
的实例。
数据库 Reader
Spring Batch 提供以下数据库 reader:
Neo4jItemReader
Neo4jItemReader
是一个 ItemReader
,它使用分页技术从图数据库 Neo4j 中读取对象。Spring Batch 提供了一个 Neo4jItemReaderBuilder
,用于构建 Neo4jItemReader
的实例。
MongoItemReader
MongoItemReader
是一种通过分页技术从 MongoDB 读取文档的 ItemReader
。Spring Batch 提供了一个 MongoItemReaderBuilder
,用于构建 MongoItemReader
的实例。
HibernateCursorItemReader
HibernateCursorItemReader
是在 Hibernate 基础上构建的用于读取数据库记录的 ItemStreamReader
。它执行 HQL 查询,然后在初始化后,在调用 read()
方法时遍历结果集,连续返回与当前行对应的对象。Spring Batch 提供了一个 HibernateCursorItemReaderBuilder
,用于构建 HibernateCursorItemReader
的实例。
数据库 Writer
Spring Batch 提供以下数据库 Write:
Neo4jItemWriter
Neo4jItemWriter
是向 Neo4j 数据库写入数据的 ItemWriter
实现。Spring Batch 提供了一个 Neo4jItemWriterBuilder
,用于构建 Neo4jItemWriter
实例。
MongoItemWriter
MongoItemWriter
是一个 ItemWriter
实现,它使用 Spring Data 的 MongoOperations
实现写入 MongoDB 存储。Spring Batch 提供了一个 MongoItemWriterBuilder
,用于构建 MongoItemWriter
的实例。
RepositoryItemWriter
RepositoryItemWriter
是 Spring Data CrudRepository
的 ItemWriter
wrapper。Spring Batch 提供了一个 RepositoryItemWriterBuilder
,用于构建 RepositoryItemWriter
的实例。
HibernateItemWriter
HibernateItemWriter
是一种 ItemWriter
,它使用 Hibernate session 来保存或更新不属于当前 Hibernate session 的实体。Spring Batch 提供了一个 HibernateItemWriterBuilder
,用于构建 HibernateItemWriter
的实例。
专用的 Reader
Spring Batch 提供以下专用 reader:
LdifReader
LdifReader
可从 Resource
中读取 LDIF(LDAP 数据交换格式)记录,对其进行解析,并为每次 read
返回一个 LdapAttribute
对象。Spring Batch 提供了一个 LdifReaderBuilder
,用于构建 LdifReader
的实例。
专用的 Writer
Spring Batch 提供以下专用的 Writer