在 Spring Boot 应用中使用 Filewatch 监控目录的变化

概览

有时我们需要在应用中监控本地磁盘上的某个目录,在目录中的文件发生变化时(创建、编辑、删除)进行相应的处理。

在 Java 中有好几种方式可以实现监控目录。

  1. Java WatchService API:在 Java 7 中引入,属于低级 API。
  2. Apache commons io:提供了一个用于监控文件系统事件的组件。
  3. Spring Integration 的文件支持:这是 Spring integration 项目的一部分,该项目支持多种企业集成模式。
  4. Spring Boot Developer Tools 中的 Filewatch: 它可以观察本地文件系统的变化。

本文将带你了解如何通过 Spring Boot Developer Tools 中的 Filewatch 来监控系统目录。它本身就是基于 Spring 构建,所以可以和 Spring Boot 无缝集成。

用例

我们希望通过将 csv 文件复制到特定位置(目录)来在应用中创建新的客户。一旦文件完全传输完成,就对其进行读取。然后,对 csv 文件进行验证、处理并移动到目标目录。以下是csv文件的示例:

name, email, dob, info, vip
James Hart,james.hart@gmail.com,12/05/2002,Information for James,No
Will Avery,will.avery@gmail.com,23/10/1991,Information for Will,Yes
Anne Williams,anne.williams@gmail.com,12/05/1975,Information for Anne,No
Julia Norton,julia.norton@gmail.com,23/10/1984,Information for Julia,Yes

csv 文件始终包含标题行和按特定顺序排列的 5 列。

软件版本

  1. Java 21
  2. Spring Boot 3.1.5
  3. Maven 3.9.1

运行源码至少需要 java 17+、Spring Boot 3+ 和 Maven 3.8+。

依赖

pom.xml 中添加依赖 spring-boot-devtools

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
</dependency> 

监控目录

FileSystemWatcher 是用于监视特定目录中文件变化的类。它有三个构造函数,但常使用的是接受三个参数的构造函数

public FileSystemWatcher(boolean daemon,
                         Duration pollInterval,
                         Duration quietPeriod)

各个参数如下:

  1. deamon:是否由 deamon (守护)线程监控更改。如果希望在 jvm 停止时杀死该线程(监控),将其设置为 true
  2. pollInterval:检查更改的间隔时间。
  3. quietPeriod: 检测到更改后的等待时间。如果向目录中传输大文件,则必须考虑到这一点,以确定文件传输完毕后再触发事件,以保证读取到的是完整的文件。

把上述参数配置在 application.properties 中,方便灵活修改。

application.file.watch.daemon=true
application.file.watch.pollInterval=5
application.file.watch.quietPeriod=1
application.file.watch.directory=C:\\workspace\\files\\customer

如上,应用将每 5 分钟扫描一次目录修改。更改将在 1 分钟后触发/传播。这就足够了,因为 csv 文件很小(小于 1 MB)。

通过 @ConfigurationProperties + Java Record 加载配置(Spring Boot 3 支持):

@ConfigurationProperties(prefix = "application.file.watch")
public record FileWatcherProperties(
    @NotBlank String directory,
    boolean daemon,
    @Positive Long pollInterval,
    @Positive Long quietPeriod
) {}

接下来,通过配置类定义 FileSystemWatch Bean。

@Configuration
@EnableConfigurationProperties(FileWatcherProperties.class)
public class CustomerFileWatcherConfig {
// 类成员、构造函数、logger 省略

    @Bean
    FileSystemWatcher fileSystemWatcher() {
        var fileSystemWatcher = new FileSystemWatcher(
                properties.daemon(),
                Duration.ofMinutes(properties.pollInterval()),
                Duration.ofMinutes(properties.quietPeriod()));
        fileSystemWatcher.addSourceDirectory( 
            Path.of(properties.directory()).toFile());
        fileSystemWatcher.addListener(
            new CustomerAddFileChangeListener(fileProcessor));
        fileSystemWatcher.setTriggerFilter(
            f -> f.toPath().endsWith(".csv"));        
        fileSystemWatcher.start();
        logger.info(String.format("FileSystemWatcher initialized. 
            Monitoring directory %s",properties.directory()));

        return fileSystemWatcher;
    }

    @PreDestroy
    public void onDestroy() throws Exception {
        logger.info("Shutting Down File System Watcher.");
        fileSystemWatcher().stop();
    }
}

fileSystemWatcher() 方法中的逻辑如下:

  1. 首先创建 fileSystemWatcher 实例,并将 Bean 属性中的值作为参数传递。Bean 属性对象由 Spring 容器管理,并通过构造函数注入。
  2. 调用 addSourceDirectory 方法。该方法接收一个要监控的目录。
  3. addListener 方法接受一个文件更改事件的监听器。FileChangeListener 是一个函数式接口,其 onChange 方法将在文件发生更改时被调用。
  4. 可选择在方法 setTriggerFilter 中设置 FileFilter,以限制触发更改的文件。使用 Lambda 表达式,限制文件为 csv 文件。
  5. 最后的 start 方法是开始监控目录的变化。

注意,predestroy 钩子方法可以在 jvm 停止时优雅地关闭 fileSystemWatcher

添加监听器

第二步是实现 FileChangeListener,在该接口中处理文件。这个接口也是函数式接口,因此可以使用 Lambda 表达式或方法引用。在本例中,最好将其定义单独的类中,这样会提高可读性。

public class CustomerAddFileChangeListener implements 
    FileChangeListener {
    private static Logger logger = LoggerFactory.getLogger( 
        CustomerAddFileChangeListener.class);

    private CustomerCSVFileProcessor fileProcessor;

    public CustomerAddFileChangeListener(
        CustomerCSVFileProcessor fileProcessor) {
        this.fileProcessor = fileProcessor;
    }

    @Override
    public void onChange(Set<ChangedFiles> changeSet) {
        for(ChangedFiles files : changeSet)
            for(ChangedFile file: files.getFiles())
                if (file.getType().equals(ChangedFile.Type.ADD))
                    fileProcessor.process(file.getFile().toPath());
    }
}

如前所述,当文件发生更改时,将调用 onChange 方法。参数 Set 是一组已更改的文件(自轮询间隔开始以来)。通过遍历 changeSet,可以通过 getFiles 方法从每个 ChangedFiles 对象中获取文件。因此,可以使用嵌套循环来获取各个文件。单个文件的类型是 ChangedFile,它提供了对文件和更改/事件类型(这是一个枚举,有 ADDDELETEMODIFY 三个值)的访问。

回到代码,if 语句会检查类型,确保只处理 ADD 事件。其他事件将被忽略。CustomerCSVFileProcessor 类执行所有工作。

处理文件

处理文件的业务逻辑位于 process 方法中(在 FileProcessor 接口中声明)。它需要 CustomerService 来将客户保存到数据库中,因此它是通过构造函数注入的。

CustomerCSVFileProcessor 类被标记为 @Component,因为它需要被注入到 Watcher 中。

@Component
public class CustomerCSVFileProcessor implements FileProcessor {
    public static final int NUMBER_OF_COLUMNS = 5;

    private static Logger logger = LoggerFactory.getLogger( 
        CustomerCSVFileProcessor.class);

     private CustomerService customerService;

     public CustomerCSVFileProcessor(
         CustomerService customerService) {
         this.customerService = customerService;
     }

     public void process(Path file) {
        logger.info(String.format(
            "Init processing file %s",file.getFileName()));
        var parser = CSVParser.parse(file);
        parser.getRecords().forEach(this::processRecord);
        moveFile(file);
    }

    private void processRecord(CSVRecord csvRecord) {
       if (csvRecord.size() < NUMBER_OF_COLUMNS) {
           logger.info(String.format(
              "Line %d skipped. Not enough values.", 
              csvRecord.lineNumber()));
           return;
       }

       Customer customer = customerMapper.mapToCustomer(csvRecord);
       customer.setStatus(Customer.Status.ACTIVATED);
       customerService.save(customer);
       logger.info(String.format(
           "Saved customer %s in line %d",
           customer.getName(), 
           csvRecord.lineNumber()));
    }

    private static void moveFile(Path file) {
        try {
            var destinationFolder = Path.of( 
                file.getParent().toString() + OUTPUT_FOLDER );
            Files.move(
                file, 
                destinationFolder.resolve(file.getFileName()), 
                REPLACE_EXISTING);
            logger.info(String.format(
                 "File %s has been moved to %s",file.getFileName(), 
                 destinationFolder));
        } catch (IOException e) {
            logger.error("Unable to move file "+ file, e);
        }
    }
}

该方法通过 CSVParser 解析 csv 文件。然后将每条 cvs 行(描述为 CSVRecord)映射到 Customer 实体并保存到数据库中。最后,foreach 循环结束,文件被移动到一个不同的位置。

测试

重新运行应用将从 CustomerFileWatcherConfig 配置类启动文件系统 Watcher Bean。一旦应用启动,就会将一个包含一些客户的 csv 文件复制到监视的目录中。5分钟后,将输出以下日志信息:

CustomerFileWatcherConfig : FileSystemWatcher initialized. Monitoring directory C:\workspace\files\customer
CustomerCSVFileProcessor   : Init processing file customerFile.csv
CustomerCSVFileProcessor   : Saved customer James Hart in line 1
CustomerCSVFileProcessor   : Saved customer Will Avery in line 2
CustomerCSVFileProcessor   : Saved customer Anne Williams in line 3
CustomerCSVFileProcessor   : Saved customer Julia Norton in line 4
CustomerCSVFileProcessor   : File customerFile.csv has been moved to C:\workspace\files\customer\output

的确,文件中的数据行已作为 Customer 实体添加到数据库中,并且文件已成功移动。

总结

本文介绍了如何在 Spring Boot 中通过 Spring Boot Developer Tools 中的 Filewatch 来监控系统目录中的文件变化。该功能在执行自动化任务时比较有用,例如,监听 FTP 目录,在 FTP 服务器把文件传输到目录中后进行读取、处理。


Ref:https://dev.to/noelopez/monitoring-a-directory-in-spring-2nen