Spring Batch 教程

1、概览

本文将带你深入了解 Spring Batch。它是一个批处理框架,专为执行批处理作业而设计。

当前的 5.0.0 版本支持 Spring 6 和 Java 17。

2、工作流基础

Spring Batch 遵循传统的批处理架构,其中 Job Repository 负责 Job 的调度和交互。

一项工作(Job)可以有多个步骤(Step)。每个步骤通常都遵循 读取数据处理数据写入数据 的顺序。

当然,框架会在这里完成大部分繁重的工作,尤其是在处理 Job 的底层持久化时 - 本文使用 h2 作为 Job Repository。

2.1、应用示例

在本例中,我们需要将一些财务交易数据从 CSV 迁移到 XML。

输入文件的结构非常简单。

每行包含一笔交易,由用户名(username)、用户 ID(userid)、交易日期(transaction_date)和金额(transaction_amount)组成:

username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411

3、Maven 依赖

本项目需要依赖 Spring Core、Spring Batch 和 H2 数据库:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
    <version>6.0.6</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.1.214</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
    <version>5.0.0</version>
</dependency>

4、Spring Batch 和 Job 配置

基本的 Spring Batch 配置以及 CSV 转 XML 功能的 Job 描述如下所示。

基于 Java 的 Job 配置:

@Profile("spring")
public class SpringBatchConfig {

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    @Bean
    public ItemReader<Transaction> itemReader()
      throws UnexpectedInputException, ParseException {
        FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        String[] tokens = { "username", "userid", "transactiondate", "amount" };
        tokenizer.setNames(tokens);
        reader.setResource(inputCsv);
        DefaultLineMapper<Transaction> lineMapper = 
          new DefaultLineMapper<Transaction>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
        reader.setLineMapper(lineMapper);
        return reader;
    }

    @Bean
    public ItemProcessor<Transaction, Transaction> itemProcessor() {
        return new CustomItemProcessor();
    }

    @Bean
    public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
      throws MalformedURLException {
        StaxEventItemWriter<Transaction> itemWriter = 
          new StaxEventItemWriter<Transaction>();
        itemWriter.setMarshaller(marshaller);
        itemWriter.setRootTagName("transactionRecord");
        itemWriter.setResource(outputXml);
        return itemWriter;
    }

    @Bean
    public Marshaller marshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(new Class[] { Transaction.class });
        return marshaller;
    }

    @Bean
    protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<Transaction> reader, 
                      ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer, ) {
        return new StepBuilder("step1", jobRepository).<Transaction, Transaction> chunk(10, transactionManager)
          .reader(reader).processor(processor).writer(writer).build();
    }

    @Bean(name = "firstBatchJob")
    public Job job(JobRepository jobRepository, @Qualifier("step1") Step step1) {
        return new JobBuilder("firstBatchJob", jobRepository).preventRestart().start(step1).build();
    }
    
    public DataSource dataSource() {
     EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();
     return builder.setType(EmbeddedDatabaseType.H2)
           .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
           .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
           .build();
    }
    
    @Bean(name = "transactionManager")
    public PlatformTransactionManager getTransactionManager() {
        return new ResourcelessTransactionManager();
    }
    
    @Bean(name = "jobRepository")
    public JobRepository getJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource());
        factory.setTransactionManager(getTransactionManager());
        factory.afterPropertiesSet();
        return factory.getObject();
    }
    
    @Bean(name = "jobLauncher")
    public JobLauncher getJobLauncher() throws Exception {
       TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
       jobLauncher.setJobRepository(getJobRepository());
       jobLauncher.afterPropertiesSet();
       return jobLauncher;
    }
}

以及,基于 XML 的配置:

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="input/record.csv" />
    <property name="lineMapper">
        <bean
            class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean
                    class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="names"
                        value="username,userid,transactiondate,amount" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean
                    class="com.baeldung.batch.service.RecordFieldSetMapper" />
            </property>
        </bean>
    </property>
    <property name="linesToSkip" value="1" />
</bean>

<bean id="itemProcessor" class="com.baeldung.batch.service.CustomItemProcessor" />

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="marshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound">
        <list>
            <value>com.baeldung.batch.model.Transaction</value>
        </list>
    </property>
</bean>

<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
                processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>


<!-- 连接到 H2 database -->
<bean id="dataSource"
      class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.h2.Driver" />
    <property name="url" value="jdbc:h2:file:~/repository" />
    <property name="username" value="" />
    <property name="password" value="" />
</bean>

<!-- 自动创建 job-meta 表 -->
<jdbc:initialize-database data-source="dataSource">
    <jdbc:script
            location="org/springframework/batch/core/schema-drop-h2.sql" />
    <jdbc:script location="org/springframework/batch/core/schema-h2.sql" />
</jdbc:initialize-database>

<!-- 在数据库中存储 job-meta -->
<bean id="jobRepository"
      class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="transactionManager" ref="transactionManager" />
    <property name="databaseType" value="h2" />
</bean>

<bean id="transactionManager"
      class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
     <property name="jobRepository" ref="jobRepository" />
</bean>

整个配置如上,接下来细细讨论。

4.1、使用 ItemReader 读取数据和创建对象

首先,配置了 cvsFileItemReader,它会读取 record.csv 中的数据并将其转换为 Transaction 对象:

@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
    private String username;
    private int userId;
    private LocalDateTime transactionDate;
    private double amount;

    /* get、set 省略 */

    @Override
    public String toString() {
        return "Transaction [username=" + username + ", userId=" + userId
          + ", transactionDate=" + transactionDate + ", amount=" + amount
          + "]";
    }
}

为此,使用自定义 mapper:

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
 
    public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy");
        Transaction transaction = new Transaction();
 
        transaction.setUsername(fieldSet.readString("username"));
        transaction.setUserId(fieldSet.readInt(1));
        transaction.setAmount(fieldSet.readDouble(3));
        String dateString = fieldSet.readString(2);
        transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
        return transaction;
    }
}

4.2、使用 ItemProcessor 处理数据

创建自定义的项目处理器 CustomItemProcessor。它不会处理任何与 Transaction 对象相关的内容。

它所做的只是将来自 reader 的原始对象传递给 writer者:

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {

    public Transaction process(Transaction item) {
        return item;
    }
}

4.3、使用 ItemWriter 将对象写入 FS

最后,把该 transaction 存储到一个 XML 文件中,该文件位于 xml/output.xml

<bean id="itemWriter"
  class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml" />
    <property name="marshaller" ref="marshaller" />
    <property name="rootTagName" value="transactionRecord" />
</bean>

4.4、配置 Batch Job

我们要做的就是使用 batch:job 语法将这些点与 Job 连接起来。

注意 commit-interval(提交间隔)。这是在向 itemWriter 提交批处理之前内存中要保留的 Transaction 数量。

它会将 Transaction 保留在内存中,直到该点(或输入数据结束)。

Java Bean 和相应的 XML 配置如下:

@Bean
protected Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("itemProcessor") ItemProcessor<Transaction,
        Transaction> processor, ItemWriter<Transaction> writer) {
    return new StepBuilder("step1", jobRepository)
            .<Transaction, Transaction> chunk(10, transactionManager)
            .reader(itemReader(inputCsv))
            .processor(processor)
            .writer(writer)
            .build();
}
<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

4.5、运行 Batch Job

设置好一切后,运行应用:

@Profile("spring")
public class App {
    public static void main(String[] args) {
        // Spring Java 配置
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(SpringConfig.class);
        context.register(SpringBatchConfig.class);
        context.refresh();
        
        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("firstBatchJob");
        System.out.println("Starting the batch job");
        try {
            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Job Status : " + execution.getStatus());
            System.out.println("Job completed");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Job failed");
        }
    }
}

使用 -Dspring.profiles.active=spring Profile 运行 Spring 应用。

5、Spring Boot 配置

创建一个 Spring Boot 应用,并将之前的 Spring Batch 配置转换为在 Spring Boot 环境中运行。

基本上,这与之前的 Spring Batch 示例大致相同。

5.1、Maven 依赖

pom.xml 中添加 spring-boot-starter-batch 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

还需要使用数据库来存储 Spring Batch Job 信息,在本例中仍然使用 H2 内存数据库。

<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.1.214</version>
    <scope>runtime</scope>
</dependency>

5.2、Spring Boot 配置

使用 @Profile 注解来区分 Spring 和 Spring Boot 配置。

在应用中设置 spring-boot Profile:

@SpringBootApplication
public class SpringBatchApplication {

    public static void main(String[] args) {
        SpringApplication springApp = new SpringApplication(SpringBatchApplication.class);
        springApp.setAdditionalProfiles("spring-boot");
        springApp.run(args);
    }

}

5.3、Spring Batch Job 配置

使用的 Batch Job 配置与之前的 SpringBatchConfig 类相同:

@Configuration
public class SpringBootBatchConfig {

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("input/recordWithInvalidData.csv")
    private Resource invalidInputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    // ...
}

从 spring-boot 3.0 开始,不鼓励使用 @EnableBatchProcessing 注解,我们需要手动声明 JobRepositoryJobLauncherTransactionManager Bean。此外,JobBuilderFactoryStepBuilderFactory 已被弃用,建议使用 JobBuilderStepBuilder 类。

6、总结

本文通过实际示例介绍了 Spring Batch 的概念和用法,以及在 Spring 和 Spring Boot 中的不同配置方式。


Ref:https://www.baeldung.com/introduction-to-spring-batch