Spring Boot 整合 Spring Batch
1、概览
Spring Batch 是一个强大的批处理框架,可用于开发健壮的批处理应用。
上一篇教程 介绍了 Spring Batch,本文将在此基础上带你了解如何使用 Spring Boot 设置和创建一个基本的批处理驱动应用。
2、Maven 依赖
首先,在 pom.xml
中添加 spring-boot-starter-batch
和 h2
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
<scope>runtime</scope>
</dependency>
3、定义简单的 Spring Batch Job
创建一个 Job,从 CSV 文件导入 coffee 清单,使用自定义 Processor 对其进行转换,并将最终结果存储到内存数据库中。
3.1、开始
首先,程序入口:
@SpringBootApplication
public class SpringBootBatchProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
}
}
可以看到,这是一个标准的 Spring Boot 应用。为了简单,一切配置都是使用默认值。
在 src/main/resources/application.properties
文件中定义如下属性:
file.input=coffee-list.csv
该属性包含输入 coffee 列表的位置。每一行都包含 coffee 的品牌、产地和一些特征:
Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey
如你所见,这是一个平面 CSV 文件,这意味着 Spring 无需任何特殊定制就可以处理它。
接下来,添加一个 SQL 脚本 schema-all.sql
,以创建 coffee
表来存储数据:
DROP TABLE coffee IF EXISTS;
CREATE TABLE coffee (
coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
brand VARCHAR(20),
origin VARCHAR(20),
characteristics VARCHAR(30)
);
Spring Boot 会在启动时自动运行该脚本。
3.2、Coffee Domain 类
需要一个简单的 Domain 类来保存 coffee 项目:
public class Coffee {
private String brand;
private String origin;
private String characteristics;
public Coffee(String brand, String origin, String characteristics) {
this.brand = brand;
this.origin = origin;
this.characteristics = characteristics;
}
// get、set 省略
}
如上,Coffee
对象包含三个属性:
- brand:品牌
- origin:产地
- characteristics:其他一些特点
4、Job 配置
现在进入关键的部分,即 Job 配置。
@Configuration
public class BatchConfiguration {
@Value("${file.input}")
private String fileInput;
// ...
}
首先,从标准的 Spring @Configuration
类开始。注意,在 Spring boot 3.0 中,不鼓励使用 @EnableBatchProcessing
。此外,JobBuilderFactory
和 StepBuilderFactory
已被弃用,建议使用带有 Job 或 Step Builder 名称的 JobBuilder
和 StepBuilder
类。
在初始配置的最后一部分,引用之前声明的 file.input
属性。
4.1、Job 的 Reader 和 Writer
在配置中定义一个 Reader
Bean:
@Bean
public FlatFileItemReader reader() {
return new FlatFileItemReaderBuilder().name("coffeeItemReader")
.resource(new ClassPathResource(fileInput))
.delimited()
.names(new String[] { "brand", "origin", "characteristics" })
.fieldSetMapper(new BeanWrapperFieldSetMapper() {{
setTargetType(Coffee.class);
}})
.build();
}
简而言之,上面定义的 Reader Bean 会查找名为 coffee-list.csv
的文件,并将每一行的条目解析为一个 Coffee
对象。
同样,定义一个 Writer Bean:
@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
.dataSource(dataSource)
.build();
}
它根据 Coffee
对象的 Java Bean 属性,定义在数据库中插入单个 coffee
项目所需的 SQL 语句。
4.2、把 Job 结合起来
最后,需要添加实际的 Job、Step 配置:
@Bean
public Job importUserJob(JobRepository jobRepository, JobCompletionNotificationListener listener, Step step1) {
return new JobBuilder("importUserJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcBatchItemWriter writer) {
return new StepBuilder("step1", jobBuilder)
.<Coffee, Coffee> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public CoffeeItemProcessor processor() {
return new CoffeeItemProcessor();
}
可以看到,Job 相对简单,只有 step1
方法中定义的一个步骤(Step)。
该步骤做了啥?
- 首先,使用
chunk(10)
声明配置步骤,使其一次最多写入 10 条记录。 - 然后,使用
reader
方法设置的 Reader Bean 读取 coffee 数据。 - 接下来,把每个 coffee 项目传递给一个自定义处理器(processor),并在其中应用一些自定义业务逻辑。
- 最后,使用之前看到的 Writer 将每个 coffee 项目写入数据库。
importUserJob
包含 Job
定义,使用内置的 RunIdIncrementer
类包含一个 id
。还设置了一个 JobCompletionNotificationListener
,以便在 Job 完成时收到通知。
5、自定义 Coffee Processor
现在,来详细看看上述在 Job 配置中定义的自定义处理器:
public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {
private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);
@Override
public Coffee process(final Coffee coffee) throws Exception {
String brand = coffee.getBrand().toUpperCase();
String origin = coffee.getOrigin().toUpperCase();
String chracteristics = coffee.getCharacteristics().toUpperCase();
Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);
return transformedCoffee;
}
}
ItemProcessor
接口提供了一种在 Job 执行过程中应用某些特定业务逻辑的机制。
定义 CoffeeItemProcessor
,它接收输入的 Coffee
对象,并将每个属性转换为大写字母。
6、Job 完成
还要编写一个 JobCompletionNotificationListener
,以便在作业完成时提供一些反馈:
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOGGER.info("!!! 任务完成!验证结果");
String query = "SELECT brand, origin, characteristics FROM coffee";
jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
.forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
}
}
在上述示例中,覆写了 afterJob
方法,以检查 Job 是否成功完成。运行了一个简单的查询来检查每个 coffee 项目是否已成功存储到数据库中。
7、运行 Job
一切就绪后,运行 Job:
...
17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener -
!!! 任务完成!验证结果
17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener -
Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...
输出日志如上,可以看到 Job 运行成功,每个coffee 项目都如期存储到了数据库中。
8、总结
本文介绍了如何使用 Spring Boot 创建一个简单的 Spring Batch Job,包括如何添加文件 Reader 和 数据库 Writer 以及自定义处理器。
Ref:https://www.baeldung.com/spring-boot-spring-batch