Spring Boot 整合 Spring Batch

1、概览

Spring Batch 是一个强大的批处理框架,可用于开发健壮的批处理应用。

上一篇教程 介绍了 Spring Batch,本文将在此基础上带你了解如何使用 Spring Boot 设置和创建一个基本的批处理驱动应用。

2、Maven 依赖

首先,在 pom.xml 中添加 spring-boot-starter-batchh2 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.1.214</version>
    <scope>runtime</scope>
</dependency>

3、定义简单的 Spring Batch Job

创建一个 Job,从 CSV 文件导入 coffee 清单,使用自定义 Processor 对其进行转换,并将最终结果存储到内存数据库中。

3.1、开始

首先,程序入口:

@SpringBootApplication
public class SpringBootBatchProcessingApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
    }
}

可以看到,这是一个标准的 Spring Boot 应用。为了简单,一切配置都是使用默认值。

src/main/resources/application.properties 文件中定义如下属性:

file.input=coffee-list.csv

该属性包含输入 coffee 列表的位置。每一行都包含 coffee 的品牌、产地和一些特征:

Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey

如你所见,这是一个平面 CSV 文件,这意味着 Spring 无需任何特殊定制就可以处理它。

接下来,添加一个 SQL 脚本 schema-all.sql,以创建 coffee 表来存储数据:

DROP TABLE coffee IF EXISTS;

CREATE TABLE coffee  (
    coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    brand VARCHAR(20),
    origin VARCHAR(20),
    characteristics VARCHAR(30)
);

Spring Boot 会在启动时自动运行该脚本。

3.2、Coffee Domain 类

需要一个简单的 Domain 类来保存 coffee 项目:

public class Coffee {

    private String brand;
    private String origin;
    private String characteristics;

    public Coffee(String brand, String origin, String characteristics) {
        this.brand = brand;
        this.origin = origin;
        this.characteristics = characteristics;
    }

    // get、set 省略
}

如上,Coffee 对象包含三个属性:

  • brand:品牌
  • origin:产地
  • characteristics:其他一些特点

4、Job 配置

现在进入关键的部分,即 Job 配置。

@Configuration
public class BatchConfiguration {
    
    @Value("${file.input}")
    private String fileInput;
    
    // ...
}

首先,从标准的 Spring @Configuration 类开始。注意,在 Spring boot 3.0 中,不鼓励使用 @EnableBatchProcessing。此外,JobBuilderFactoryStepBuilderFactory 已被弃用,建议使用带有 Job 或 Step Builder 名称的 JobBuilderStepBuilder 类。

在初始配置的最后一部分,引用之前声明的 file.input 属性。

4.1、Job 的 Reader 和 Writer

在配置中定义一个 Reader Bean:

@Bean
public FlatFileItemReader reader() {
    return new FlatFileItemReaderBuilder().name("coffeeItemReader")
      .resource(new ClassPathResource(fileInput))
      .delimited()
      .names(new String[] { "brand", "origin", "characteristics" })
      .fieldSetMapper(new BeanWrapperFieldSetMapper() {{
          setTargetType(Coffee.class);
      }})
      .build();
}

简而言之,上面定义的 Reader Bean 会查找名为 coffee-list.csv 的文件,并将每一行的条目解析为一个 Coffee 对象。

同样,定义一个 Writer Bean:

@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder()
      .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
      .sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
      .dataSource(dataSource)
      .build();
}

它根据 Coffee 对象的 Java Bean 属性,定义在数据库中插入单个 coffee 项目所需的 SQL 语句。

4.2、把 Job 结合起来

最后,需要添加实际的 Job、Step 配置:

@Bean
public Job importUserJob(JobRepository jobRepository, JobCompletionNotificationListener listener, Step step1) {
    return new JobBuilder("importUserJob", jobRepository)
      .incrementer(new RunIdIncrementer())
      .listener(listener)
      .flow(step1)
      .end()
      .build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, JdbcBatchItemWriter writer) {
    return new StepBuilder("step1", jobBuilder)
      .<Coffee, Coffee> chunk(10, transactionManager)
      .reader(reader())
      .processor(processor())
      .writer(writer)
      .build();
}

@Bean
public CoffeeItemProcessor processor() {
    return new CoffeeItemProcessor();
}

可以看到,Job 相对简单,只有 step1 方法中定义的一个步骤(Step)。

该步骤做了啥?

  • 首先,使用 chunk(10) 声明配置步骤,使其一次最多写入 10 条记录。
  • 然后,使用 reader 方法设置的 Reader Bean 读取 coffee 数据。
  • 接下来,把每个 coffee 项目传递给一个自定义处理器(processor),并在其中应用一些自定义业务逻辑。
  • 最后,使用之前看到的 Writer 将每个 coffee 项目写入数据库。

importUserJob 包含 Job 定义,使用内置的 RunIdIncrementer 类包含一个 id。还设置了一个 JobCompletionNotificationListener,以便在 Job 完成时收到通知。

5、自定义 Coffee Processor

现在,来详细看看上述在 Job 配置中定义的自定义处理器:

public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {

    private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);

    @Override
    public Coffee process(final Coffee coffee) throws Exception {
        String brand = coffee.getBrand().toUpperCase();
        String origin = coffee.getOrigin().toUpperCase();
        String chracteristics = coffee.getCharacteristics().toUpperCase();

        Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
        LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);

        return transformedCoffee;
    }
}

ItemProcessor 接口提供了一种在 Job 执行过程中应用某些特定业务逻辑的机制。

定义 CoffeeItemProcessor,它接收输入的 Coffee 对象,并将每个属性转换为大写字母。

6、Job 完成

还要编写一个 JobCompletionNotificationListener,以便在作业完成时提供一些反馈:

@Override
public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
        LOGGER.info("!!! 任务完成!验证结果");

        String query = "SELECT brand, origin, characteristics FROM coffee";
        jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
          .forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
    }
}

在上述示例中,覆写了 afterJob 方法,以检查 Job 是否成功完成。运行了一个简单的查询来检查每个 coffee 项目是否已成功存储到数据库中。

7、运行 Job

一切就绪后,运行 Job:

...
17:41:16.336 [main] INFO  c.b.b.JobCompletionNotificationListener -
  !!! 任务完成!验证结果
17:41:16.336 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO  c.b.b.JobCompletionNotificationListener -
  Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...

输出日志如上,可以看到 Job 运行成功,每个coffee 项目都如期存储到了数据库中。

8、总结

本文介绍了如何使用 Spring Boot 创建一个简单的 Spring Batch Job,包括如何添加文件 Reader 和 数据库 Writer 以及自定义处理器。


Ref:https://www.baeldung.com/spring-boot-spring-batch