DelayQueue 实现临时上传文件的过期定时清理
学院的智慧党建项目要实现图片上传功能,为了解决图片临时上传后的清理问题,考虑到数据量不大,我想到了用 Java 的 DelayQueue 延迟队列来处理。
需求分析
前端在编辑器里编辑文章时,有时需要上传本地图片,如下:
上传图片:
要求:
- 上传到后端后,后端需要返回一个可访问的 URL 链接给前端用来在文章中引用
- 临时上传到后端的图片若一定时间内未被任何文章引用,需要定时清理
技术选型
说到定时任务,首先想到两种方案:
- SpringBoot 提供的定时任务实现
- Java 提供的延迟队列来实现
考虑到该项目原本就是在学院内使用,数据量不大。而且定时任务时效性差,不能针对单独文件进行计时,故选择第二种方案。
代码实现
延迟队列实现的思路还是很简单的:
将临时上传的图片都放入队列中并设置好过期时间,一旦图片过期就从队列中取出并删除。如果在过期前有文章引用了该图片,那么从队列中将其删除即可。
TTLFile
我们先创建一个过期文件类 TTLFile
实现 Delayed
接口,这里我们定义的是抽象类,实现一些基础功能,留下一些关键功能,如 clean()
方法,便于后续拓展功能。比如,用于清理本地文件的 LocalFile
,或是存储在 Minio 上的 MinioFile
package com.dangjian.clean;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
// 延迟队列中的元素都需要实现 Delayed 接口来判断过期时间
public abstract class TTLFile implements Delayed {
protected final String uuid;
protected long ttl; // 过期时间
@Getter
protected int failedCount = 0; // 任务失败次数
public TTLFile(String uuid, long delay) {
this.uuid = uuid;
this.ttl = System.currentTimeMillis() + delay;
}
/**
* 查看当前任务还有多久到期
*
* @param unit
* @return 剩余时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(ttl - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 延迟队列需要到期时间升序入队,所以我们需要实现compareTo进行到期时间比较
*
* @param delayed
* @return 比较结果
*/
@Override
public int compareTo(Delayed delayed) {
return Long.compare(this.ttl, ((TTLFile) delayed).ttl);
}
/**
* 判断任务是否失败
*
* @return 是否失败
*/
public boolean isFailed() {
return ++failedCount >= 3;
}
/**
* 延时
*/
public void delay(long delay) {
ttl = System.currentTimeMillis() + delay;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((uuid == null) ? 0 : uuid.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TTLFile other = (TTLFile) obj;
if (uuid == null) {
if (other.uuid != null)
return false;
} else if (!uuid.equals(other.uuid))
return false;
return true;
}
/**
* 清理文件
*
* @return 是否清理成功
*/
public abstract boolean clean();
/**
* 获取文件路径
*
* @return 文件路径
*/
public abstract String getPath();
}
注意:这里重写的 hashCode()
和 equal()
只将 uuid
作为唯一标识,因为要想从延迟队列中移除元素,我们就必须传入一个等价的元素才行。将 uuid
作为唯一标识,到时候移除元素只需要传入一个具有相同 uuid
的对象实例即可。
p.s. 一般情况,文件的存储路径都是唯一的,可以用来作为 uuid
LocalFile
因为一开始该项目并未使用 Minio 这种专门的文件存储服务,直接将文件存在本地,所以我先实现的是清理本地文件的功能。
得益于抽象类已经实现了一些基本功能,我们只需要实现 clean()
和 getPath()
即可。
package com.dangjian.clean;
import java.io.File;
public class LocalFile extends TTLFile {
// 任务
private File file;
public LocalFile(String uuid, String path, long delay) {
super(uuid, delay);
this.file = new File(path);
}
/**
* 删除文件
*
* @return 是否清理成功
*/
@Override
public boolean clean() {
return file.exists() ? file.delete() : true;
}
/**
* 获取文件路径
*
* @return 文件路径
*/
@Override
public String getPath() {
return file.getPath();
}
}
内置一个 File
类用来操作文件,clean()
也是判断文件存在后直接删除,实现起来并不难。
这样一个定时清理本地文件的类就写好了~
Cleaner
根据之前的思路,要想从延迟队列里不断取出过期元素删除,我们就需要一个后台线程异步获取队列中的元素,而且需要在 SpringBoot 一启动就要开始执行。这里我们可以用 SpringBoot 提供的 ApplicationRunner
接口来实现。
在 SpringBoot 应用程序启动时,有时我们需要执行一些特定的任务,如加载配置、建立连接等。SpringBoot 提供了 ApplicationRunner
接口,允许我们在应用程序完全启动后执行自定义的逻辑。
下面我们通过实现该接口来定义一个“清洁工”
package com.dangjian.clean;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.dangjian.utils.FileUtil;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalTime;
import java.util.List;
import java.util.concurrent.DelayQueue;
@Slf4j
@Component
public class Cleaner implements ApplicationRunner {
private DelayQueue<TTLFile> cleanQueue = new DelayQueue<>();
public void addTask(TTLFile ttlFile) {
cleanQueue.add(ttlFile);
}
public void addTask(List<TTLFile> ttlFile) {
cleanQueue.addAll(ttlFile);
}
public boolean removeTask(TTLFile ttlFile) {
return cleanQueue.remove(ttlFile);
}
public boolean removeTask(List<TTLFile> ttlFile) {
return cleanQueue.removeAll(ttlFile);
}
@Override
public void run(ApplicationArguments args) {
Thread thread = new Thread(() -> {
while (true) {
try {
TTLFile ttlFile = cleanQueue.take();
if (ttlFile.clean()) {
log.info("成功清理:{}", ttlFile.getPath());
} else {
if (ttlFile.isFailed()) {
log.error("清理失败,文件路径:{}", ttlFile.getPath());
String info = String.format("[%s] - 清理失败,文件路径:%s", LocalTime.now(), ttlFile.getPath());
FileUtil.writeLog(FileUtil.getCurrentPath(), info);
} else {
log.warn("清理失败,重试次数:{},文件路径:{}", ttlFile.getFailedCount(), ttlFile.getPath());
ttlFile.delay(60000); // 1分钟后重试
cleanQueue.put(ttlFile);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.setName("Cleaner");
thread.start();
}
}
在上面的代码中,我们内置了一个延迟队列,还定义了一些往队列里添加定时任务的方法。
其中,在重写的方法中,我们直接创建一个死循环的线程不断从队列中提取任务,这里调用的是延迟队列的 take()
方法,该方法在队列中没有可以取出的过期元素时,会阻塞消费者,直到有元素过期。所以并不会过多损耗 CPU 资源。
对于清理失败的情况,还会通过 TTLFile
内含的计数字段来重试 3 次清理操作。若依旧不成功,则该报错报错,该写日志写日志。
实战
在我们的项目中,有一个专门的临时上传文件接口,我们可以在接收文件后将其加入到队列里。
@Operation(summary = "临时上传文件")
@PostMapping("/upload")
public Result upload(@Parameter(description = "上传的文件") @RequestPart MultipartFile file) {
Optional.ofNullable(file).orElseThrow(() -> new CustomException("上传文件为空!"));
String tempPath = FileUtil.tempUpload(file);
String uuid = UUID.randomUUID().toString();
redisTemplate.opsForValue().set(uuid, tempPath, 10, TimeUnit.MINUTES);
// 将临时路径作为唯一标识
cleaner.addTask(new LocalFile(tempPath, 10 * 60 * 1000)); // 10分钟后删除
log.info("文件上传成功,临时路径:{}", tempPath);
return Result.ok(uuid);
}
p.s. Cleaner 带有 @Component 注解,可直接注入
而当我们引用临时文件,需要将他从队列中移除时,只需要用相同的路径作为表示传入一个新对象即可。
下面的代码是用来设置支部活动附件,附件已经临时上传到后端了
@Override
public void setActivityFile(String actId, Activity activity) {
// 通过活动 ID 从 Redis 里拿到预先存在里面的临时路径
String path = redisTemplate.opsForValue().getAndDelete(actId);
if (StringUtils.hasLength(path) && cleaner.removeTask(new LocalFile(path, 1000))) { // 传入一个具有相同临时路径的对象来删除队列中的临时文件
Optional.ofNullable(activity.getFile()).ifPresent(FileUtil::delete);
activity.setFile(path);
} else {
throw new CustomException("临时上传的活动文件已过期,请重新上传!");
}
}
经测试,在不出现 IOException
的情况下,只要提前将任务加入队列,一旦到期,临时上传的文件就能被成功删除。
总结
清理文件时记得做好异常处理以及回滚机制,防止清理失败后未将任务重新添加回队列导致垃圾文件堆积。
延迟队列底层使用的无界队列,基于 JVM 内存,数据量少的情况下,这种方法简单且使用。若在企业级场景下,可能更多的会使用像 Kafka,RocketMQ 这种高性能消息队列提供的延迟队列来实现。