DelayQueue 实现临时上传文件的过期定时清理

学院的智慧党建项目要实现图片上传功能,为了解决图片临时上传后的清理问题,考虑到数据量不大,我想到了用 Java 的 DelayQueue 延迟队列来处理。

需求分析

前端在编辑器里编辑文章时,有时需要上传本地图片,如下:

1

上传图片:

2

要求:

  1. 上传到后端后,后端需要返回一个可访问的 URL 链接给前端用来在文章中引用
  2. 临时上传到后端的图片若一定时间内未被任何文章引用,需要定时清理

技术选型

说到定时任务,首先想到两种方案:

  1. SpringBoot 提供的定时任务实现
  2. Java 提供的延迟队列来实现

考虑到该项目原本就是在学院内使用,数据量不大。而且定时任务时效性差,不能针对单独文件进行计时,故选择第二种方案。

代码实现

延迟队列实现的思路还是很简单的:

将临时上传的图片都放入队列中并设置好过期时间,一旦图片过期就从队列中取出并删除。如果在过期前有文章引用了该图片,那么从队列中将其删除即可。

TTLFile

我们先创建一个过期文件类 TTLFile 实现 Delayed 接口,这里我们定义的是抽象类,实现一些基础功能,留下一些关键功能,如 clean() 方法,便于后续拓展功能。比如,用于清理本地文件的 LocalFile,或是存储在 Minio 上的 MinioFile

package com.dangjian.clean;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import lombok.Getter;
// 延迟队列中的元素都需要实现 Delayed 接口来判断过期时间
public abstract class TTLFile implements Delayed {

    protected final String uuid;

    protected long ttl; // 过期时间

    @Getter
    protected int failedCount = 0; // 任务失败次数

    public TTLFile(String uuid, long delay) {
        this.uuid = uuid;
        this.ttl = System.currentTimeMillis() + delay;
    }

    /**
     * 查看当前任务还有多久到期
     * 
     * @param unit
     * @return 剩余时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(ttl - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 延迟队列需要到期时间升序入队,所以我们需要实现compareTo进行到期时间比较
     * 
     * @param delayed
     * @return 比较结果
     */
    @Override
    public int compareTo(Delayed delayed) {
        return Long.compare(this.ttl, ((TTLFile) delayed).ttl);
    }

    /**
     * 判断任务是否失败
     * 
     * @return 是否失败
     */
    public boolean isFailed() {
        return ++failedCount >= 3;
    }

    /**
     * 延时
     */
    public void delay(long delay) {
        ttl = System.currentTimeMillis() + delay;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((uuid == null) ? 0 : uuid.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        TTLFile other = (TTLFile) obj;
        if (uuid == null) {
            if (other.uuid != null)
                return false;
        } else if (!uuid.equals(other.uuid))
            return false;
        return true;
    }

    /**
     * 清理文件
     * 
     * @return 是否清理成功
     */
    public abstract boolean clean();

    /**
     * 获取文件路径
     * 
     * @return 文件路径
     */
    public abstract String getPath();

}

注意:这里重写的 hashCode()equal() 只将 uuid 作为唯一标识,因为要想从延迟队列中移除元素,我们就必须传入一个等价的元素才行。将 uuid 作为唯一标识,到时候移除元素只需要传入一个具有相同 uuid 的对象实例即可。

p.s. 一般情况,文件的存储路径都是唯一的,可以用来作为 uuid

LocalFile

因为一开始该项目并未使用 Minio 这种专门的文件存储服务,直接将文件存在本地,所以我先实现的是清理本地文件的功能。

得益于抽象类已经实现了一些基本功能,我们只需要实现 clean()getPath() 即可。

package com.dangjian.clean;

import java.io.File;

public class LocalFile extends TTLFile {

    // 任务
    private File file;

    public LocalFile(String uuid, String path, long delay) {
        super(uuid, delay);
        this.file = new File(path);
    }

    /**
     * 删除文件
     * 
     * @return 是否清理成功
     */
    @Override
    public boolean clean() {
        return file.exists() ? file.delete() : true;
    }

    /**
     * 获取文件路径
     * 
     * @return 文件路径
     */
    @Override
    public String getPath() {
        return file.getPath();
    }

}

内置一个 File 类用来操作文件,clean() 也是判断文件存在后直接删除,实现起来并不难。

这样一个定时清理本地文件的类就写好了~

Cleaner

根据之前的思路,要想从延迟队列里不断取出过期元素删除,我们就需要一个后台线程异步获取队列中的元素,而且需要在 SpringBoot 一启动就要开始执行。这里我们可以用 SpringBoot 提供的 ApplicationRunner 接口来实现。

在 SpringBoot 应用程序启动时,有时我们需要执行一些特定的任务,如加载配置、建立连接等。SpringBoot 提供了 ApplicationRunner 接口,允许我们在应用程序完全启动后执行自定义的逻辑。

下面我们通过实现该接口来定义一个“清洁工”

package com.dangjian.clean;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.dangjian.utils.FileUtil;

import lombok.extern.slf4j.Slf4j;

import java.time.LocalTime;
import java.util.List;
import java.util.concurrent.DelayQueue;

@Slf4j
@Component
public class Cleaner implements ApplicationRunner {

    private DelayQueue<TTLFile> cleanQueue = new DelayQueue<>();

    public void addTask(TTLFile ttlFile) {
        cleanQueue.add(ttlFile);
    }

    public void addTask(List<TTLFile> ttlFile) {
        cleanQueue.addAll(ttlFile);
    }

    public boolean removeTask(TTLFile ttlFile) {
        return cleanQueue.remove(ttlFile);
    }

    public boolean removeTask(List<TTLFile> ttlFile) {
        return cleanQueue.removeAll(ttlFile);
    }

    @Override
    public void run(ApplicationArguments args) {
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    TTLFile ttlFile = cleanQueue.take();
                    if (ttlFile.clean()) {
                        log.info("成功清理:{}", ttlFile.getPath());
                    } else {
                        if (ttlFile.isFailed()) {
                            log.error("清理失败,文件路径:{}", ttlFile.getPath());
                            String info = String.format("[%s] - 清理失败,文件路径:%s", LocalTime.now(), ttlFile.getPath());
                            FileUtil.writeLog(FileUtil.getCurrentPath(), info);
                        } else {
                            log.warn("清理失败,重试次数:{},文件路径:{}", ttlFile.getFailedCount(), ttlFile.getPath());
                            ttlFile.delay(60000); // 1分钟后重试
                            cleanQueue.put(ttlFile);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        thread.setName("Cleaner");
        thread.start();
    }

}

在上面的代码中,我们内置了一个延迟队列,还定义了一些往队列里添加定时任务的方法。

其中,在重写的方法中,我们直接创建一个死循环的线程不断从队列中提取任务,这里调用的是延迟队列的 take() 方法,该方法在队列中没有可以取出的过期元素时,会阻塞消费者,直到有元素过期。所以并不会过多损耗 CPU 资源。

对于清理失败的情况,还会通过 TTLFile 内含的计数字段来重试 3 次清理操作。若依旧不成功,则该报错报错,该写日志写日志。

实战

在我们的项目中,有一个专门的临时上传文件接口,我们可以在接收文件后将其加入到队列里。

@Operation(summary = "临时上传文件")
@PostMapping("/upload")
public Result upload(@Parameter(description = "上传的文件") @RequestPart MultipartFile file) {
    Optional.ofNullable(file).orElseThrow(() -> new CustomException("上传文件为空!"));

    String tempPath = FileUtil.tempUpload(file);
    String uuid = UUID.randomUUID().toString();
    redisTemplate.opsForValue().set(uuid, tempPath, 10, TimeUnit.MINUTES);
    
    // 将临时路径作为唯一标识
    cleaner.addTask(new LocalFile(tempPath, 10 * 60 * 1000)); // 10分钟后删除

    log.info("文件上传成功,临时路径:{}", tempPath);
    return Result.ok(uuid);
}

p.s. Cleaner 带有 @Component 注解,可直接注入

而当我们引用临时文件,需要将他从队列中移除时,只需要用相同的路径作为表示传入一个新对象即可。

下面的代码是用来设置支部活动附件,附件已经临时上传到后端了

@Override
public void setActivityFile(String actId, Activity activity) {
    // 通过活动 ID 从 Redis 里拿到预先存在里面的临时路径
    String path = redisTemplate.opsForValue().getAndDelete(actId);
    if (StringUtils.hasLength(path) && cleaner.removeTask(new LocalFile(path, 1000))) { // 传入一个具有相同临时路径的对象来删除队列中的临时文件
        Optional.ofNullable(activity.getFile()).ifPresent(FileUtil::delete);
        activity.setFile(path);
    } else {
        throw new CustomException("临时上传的活动文件已过期,请重新上传!");
    }
}

经测试,在不出现 IOException 的情况下,只要提前将任务加入队列,一旦到期,临时上传的文件就能被成功删除。

总结

清理文件时记得做好异常处理以及回滚机制,防止清理失败后未将任务重新添加回队列导致垃圾文件堆积。

延迟队列底层使用的无界队列,基于 JVM 内存,数据量少的情况下,这种方法简单且使用。若在企业级场景下,可能更多的会使用像 Kafka,RocketMQ 这种高性能消息队列提供的延迟队列来实现。