在 Spring Boot 应用中使用 Filewatch 监控目录的变化
概览
有时我们需要在应用中监控本地磁盘上的某个目录,在目录中的文件发生变化时(创建、编辑、删除)进行相应的处理。
在 Java 中有好几种方式可以实现监控目录。
- Java WatchService API:在 Java 7 中引入,属于低级 API。
 - Apache commons io:提供了一个用于监控文件系统事件的组件。
 - Spring Integration 的文件支持:这是 Spring integration 项目的一部分,该项目支持多种企业集成模式。
 - Spring Boot Developer Tools 中的 
Filewatch: 它可以观察本地文件系统的变化。 
本文将带你了解如何通过 Spring Boot Developer Tools 中的 Filewatch 来监控系统目录。它本身就是基于 Spring 构建,所以可以和 Spring Boot 无缝集成。
用例
我们希望通过将 csv 文件复制到特定位置(目录)来在应用中创建新的客户。一旦文件完全传输完成,就对其进行读取。然后,对 csv 文件进行验证、处理并移动到目标目录。以下是csv文件的示例:
name, email, dob, info, vip
James Hart,james.hart@gmail.com,12/05/2002,Information for James,No
Will Avery,will.avery@gmail.com,23/10/1991,Information for Will,Yes
Anne Williams,anne.williams@gmail.com,12/05/1975,Information for Anne,No
Julia Norton,julia.norton@gmail.com,23/10/1984,Information for Julia,Yes
csv 文件始终包含标题行和按特定顺序排列的 5 列。
软件版本
- Java 21
 - Spring Boot 3.1.5
 - Maven 3.9.1
 
运行源码至少需要 java 17+、Spring Boot 3+ 和 Maven 3.8+。
依赖
在 pom.xml 中添加依赖 spring-boot-devtools。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
</dependency> 
监控目录
FileSystemWatcher 是用于监视特定目录中文件变化的类。它有三个构造函数,但常使用的是接受三个参数的构造函数
public FileSystemWatcher(boolean daemon,
                         Duration pollInterval,
                         Duration quietPeriod)
各个参数如下:
- deamon:是否由 deamon (守护)线程监控更改。如果希望在 jvm 停止时杀死该线程(监控),将其设置为 
true。 - pollInterval:检查更改的间隔时间。
 - quietPeriod: 检测到更改后的等待时间。如果向目录中传输大文件,则必须考虑到这一点,以确定文件传输完毕后再触发事件,以保证读取到的是完整的文件。
 
把上述参数配置在 application.properties 中,方便灵活修改。
application.file.watch.daemon=true
application.file.watch.pollInterval=5
application.file.watch.quietPeriod=1
application.file.watch.directory=C:\\workspace\\files\\customer
如上,应用将每 5 分钟扫描一次目录修改。更改将在 1 分钟后触发/传播。这就足够了,因为 csv 文件很小(小于 1 MB)。
通过 @ConfigurationProperties + Java Record 加载配置(Spring Boot 3 支持):
@ConfigurationProperties(prefix = "application.file.watch")
public record FileWatcherProperties(
    @NotBlank String directory,
    boolean daemon,
    @Positive Long pollInterval,
    @Positive Long quietPeriod
) {}
接下来,通过配置类定义 FileSystemWatch Bean。
@Configuration
@EnableConfigurationProperties(FileWatcherProperties.class)
public class CustomerFileWatcherConfig {
// 类成员、构造函数、logger 省略
    @Bean
    FileSystemWatcher fileSystemWatcher() {
        var fileSystemWatcher = new FileSystemWatcher(
                properties.daemon(),
                Duration.ofMinutes(properties.pollInterval()),
                Duration.ofMinutes(properties.quietPeriod()));
        fileSystemWatcher.addSourceDirectory( 
            Path.of(properties.directory()).toFile());
        fileSystemWatcher.addListener(
            new CustomerAddFileChangeListener(fileProcessor));
        fileSystemWatcher.setTriggerFilter(
            f -> f.toPath().endsWith(".csv"));        
        fileSystemWatcher.start();
        logger.info(String.format("FileSystemWatcher initialized. 
            Monitoring directory %s",properties.directory()));
        return fileSystemWatcher;
    }
    @PreDestroy
    public void onDestroy() throws Exception {
        logger.info("Shutting Down File System Watcher.");
        fileSystemWatcher().stop();
    }
}
fileSystemWatcher() 方法中的逻辑如下:
- 首先创建 
fileSystemWatcher实例,并将 Bean 属性中的值作为参数传递。Bean 属性对象由 Spring 容器管理,并通过构造函数注入。 - 调用 
addSourceDirectory方法。该方法接收一个要监控的目录。 addListener方法接受一个文件更改事件的监听器。FileChangeListener是一个函数式接口,其onChange方法将在文件发生更改时被调用。- 可选择在方法 
setTriggerFilter中设置FileFilter,以限制触发更改的文件。使用 Lambda 表达式,限制文件为 csv 文件。 - 最后的 
start方法是开始监控目录的变化。 
注意,predestroy 钩子方法可以在 jvm 停止时优雅地关闭 fileSystemWatcher。
添加监听器
第二步是实现 FileChangeListener,在该接口中处理文件。这个接口也是函数式接口,因此可以使用 Lambda 表达式或方法引用。在本例中,最好将其定义单独的类中,这样会提高可读性。
public class CustomerAddFileChangeListener implements 
    FileChangeListener {
    private static Logger logger = LoggerFactory.getLogger( 
        CustomerAddFileChangeListener.class);
    private CustomerCSVFileProcessor fileProcessor;
    public CustomerAddFileChangeListener(
        CustomerCSVFileProcessor fileProcessor) {
        this.fileProcessor = fileProcessor;
    }
    @Override
    public void onChange(Set<ChangedFiles> changeSet) {
        for(ChangedFiles files : changeSet)
            for(ChangedFile file: files.getFiles())
                if (file.getType().equals(ChangedFile.Type.ADD))
                    fileProcessor.process(file.getFile().toPath());
    }
}
如前所述,当文件发生更改时,将调用 onChange 方法。参数 Set 是一组已更改的文件(自轮询间隔开始以来)。通过遍历 changeSet,可以通过 getFiles 方法从每个 ChangedFiles 对象中获取文件。因此,可以使用嵌套循环来获取各个文件。单个文件的类型是 ChangedFile,它提供了对文件和更改/事件类型(这是一个枚举,有 ADD、DELETE 和 MODIFY 三个值)的访问。
回到代码,if 语句会检查类型,确保只处理 ADD 事件。其他事件将被忽略。CustomerCSVFileProcessor 类执行所有工作。
处理文件
处理文件的业务逻辑位于 process 方法中(在 FileProcessor 接口中声明)。它需要 CustomerService 来将客户保存到数据库中,因此它是通过构造函数注入的。
CustomerCSVFileProcessor 类被标记为 @Component,因为它需要被注入到 Watcher 中。
@Component
public class CustomerCSVFileProcessor implements FileProcessor {
    public static final int NUMBER_OF_COLUMNS = 5;
    private static Logger logger = LoggerFactory.getLogger( 
        CustomerCSVFileProcessor.class);
     private CustomerService customerService;
     public CustomerCSVFileProcessor(
         CustomerService customerService) {
         this.customerService = customerService;
     }
     public void process(Path file) {
        logger.info(String.format(
            "Init processing file %s",file.getFileName()));
        var parser = CSVParser.parse(file);
        parser.getRecords().forEach(this::processRecord);
        moveFile(file);
    }
    private void processRecord(CSVRecord csvRecord) {
       if (csvRecord.size() < NUMBER_OF_COLUMNS) {
           logger.info(String.format(
              "Line %d skipped. Not enough values.", 
              csvRecord.lineNumber()));
           return;
       }
       Customer customer = customerMapper.mapToCustomer(csvRecord);
       customer.setStatus(Customer.Status.ACTIVATED);
       customerService.save(customer);
       logger.info(String.format(
           "Saved customer %s in line %d",
           customer.getName(), 
           csvRecord.lineNumber()));
    }
    private static void moveFile(Path file) {
        try {
            var destinationFolder = Path.of( 
                file.getParent().toString() + OUTPUT_FOLDER );
            Files.move(
                file, 
                destinationFolder.resolve(file.getFileName()), 
                REPLACE_EXISTING);
            logger.info(String.format(
                 "File %s has been moved to %s",file.getFileName(), 
                 destinationFolder));
        } catch (IOException e) {
            logger.error("Unable to move file "+ file, e);
        }
    }
}
该方法通过 CSVParser 解析 csv 文件。然后将每条 cvs 行(描述为 CSVRecord)映射到 Customer 实体并保存到数据库中。最后,foreach 循环结束,文件被移动到一个不同的位置。
测试
重新运行应用将从 CustomerFileWatcherConfig 配置类启动文件系统 Watcher Bean。一旦应用启动,就会将一个包含一些客户的 csv 文件复制到监视的目录中。5分钟后,将输出以下日志信息:
CustomerFileWatcherConfig : FileSystemWatcher initialized. Monitoring directory C:\workspace\files\customer
CustomerCSVFileProcessor   : Init processing file customerFile.csv
CustomerCSVFileProcessor   : Saved customer James Hart in line 1
CustomerCSVFileProcessor   : Saved customer Will Avery in line 2
CustomerCSVFileProcessor   : Saved customer Anne Williams in line 3
CustomerCSVFileProcessor   : Saved customer Julia Norton in line 4
CustomerCSVFileProcessor   : File customerFile.csv has been moved to C:\workspace\files\customer\output
的确,文件中的数据行已作为 Customer 实体添加到数据库中,并且文件已成功移动。
总结
本文介绍了如何在 Spring Boot 中通过 Spring Boot Developer Tools 中的 Filewatch 来监控系统目录中的文件变化。该功能在执行自动化任务时比较有用,例如,监听 FTP 目录,在 FTP 服务器把文件传输到目录中后进行读取、处理。
Ref:https://dev.to/noelopez/monitoring-a-directory-in-spring-2nen