1. 需求说明
1.1 定时任务的动态管理;
1.2 单个执行器可配置多个cron表达式,及多次不同规则执行;
1.3 创建定时任务时,可将业务参数传入到执行器中进行业务关联。
以上需求,在网上搜索,基本都是只实现了1.1,一个执行器对应一个cron表达式,千篇一律的代码复制、粘贴。又不想花太多的时间去了解xxl-job,因为一来项目上用不到定时任务分布式管理,二来也大概搜索过,没发现能完全满足以上需求的文章。所以,只能自己想办法解决了。
实现思路:利用反射找到类及有参构造函数。
2. 实现过程
PS:代码内有相关业务类和代码,实际使用时删除,只需要关心定时任务的动态管理代码就行了
2.1 创建定时任务配置表和java对象
create table `cj_scheduled_task` (
`id` varchar(50) primary key comment '主键id',
`task_class` varchar(100) not null comment '定时任务完整类名',
`cron_expression` varchar(20) not null comment 'cron表达式',
`task_explain` varchar(200) default null comment '任务描述',
`status` tinyint(1) not null comment '状态:1.启用;2.停用',
`create_by` varchar(50) comment '创建人',
`create_time` datetime comment '创建时间',
`update_by` varchar(50) comment '修改人',
`update_time` datetime comment '修改时间'
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COMMENT = '定时任务';
package com.xxx.modules.chunjun.desreport.entity.scheduled;// 包路径,需要自己修改,下同
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* 定时任务配置
*
* @author xxx
* @date: 2021-11-29 10:49:35
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
@TableName("cj_scheduled_task")
@Data
public class ScheduledTask implements Serializable {
private static final long serialVersionUID = 1L;
/** 主键id */
@TableId(type = IdType.ASSIGN_UUID)
@ApiModelProperty(value = "主键id")
private String id;
/** 定时任务名称 */
@TableField("task_name")
@ApiModelProperty(value = "定时任务名称")
private String taskName;
/** 定时任务完整类名 */
@TableField("task_class")
@ApiModelProperty(value = "定时任务完整类名")
private String taskClass;
/** cron表达式 */
@TableField("cron_expression")
@ApiModelProperty(value = "cron表达式")
private String cronExpression;
/** 任务描述 */
@TableField("task_explain")
@ApiModelProperty(value = "任务描述")
private String taskExplain;
/** 状态:1.启用;2.停用 */
@TableField("status")
@ApiModelProperty(value = "状态:1.启用;2.停用")
private int status;
/** 创建人. */
@TableField("create_by")
@ApiModelProperty(value = "创建人")
private String createBy;
/** 创建时间. */
@TableField("create_time")
@ApiModelProperty(value = "创建时间")
private Date createTime;
/** 修改人. */
@TableField("update_by")
@ApiModelProperty(value = "修改人")
private String updateBy;
/** 修改时间. */
@TableField("update_time")
@ApiModelProperty(value = "修改时间")
private Date updateTime;
}
package com.xxx.modules.chunjun.desreport.model;
import java.io.Serializable;
import java.util.List;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
/**
* 定时任务配置
*
* @author xxx
* @date: 2021-11-29 10:49:35
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
@Getter
@Setter
public class ScheduledTaskVO extends ScheduledTask implements Serializable {
private static final long serialVersionUID = 1L;
/** 任务报表关联集 */
@ApiModelProperty(value = "任务报表关联集")
private List<ScheduledTaskReport> strList;
/** 批量删除id集 */
@ApiModelProperty(value = "批量删除id集")
private List<String> ids;
}
2.3 ThreadPoolTaskScheduler配置类
package com.xxx.modules.chunjun.config.scheduled;// 包路径,需要自己修改
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import lombok.extern.slf4j.Slf4j;
/**
* 定时任务线程池配置类
*
* @author xxx
* @date: 2021-12-01 15:46:06
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
@Configuration
@Slf4j
public class ScheduledConfig {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
log.info("创建定时任务调度线程池 start");
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
log.info("创建定时任务调度线程池 end");
return threadPoolTaskScheduler;
}
}
2.4 定时任务管理服务接口
package com.xxx.modules.chunjun.desreport.service;
import java.util.List;
import com.xxx.modules.chunjun.common.vo.Result;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO;
/**
* 定时任务服务接口
*
* @author xxx
* @date: 2021-11-29 14:15:31
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
public interface IScheduledTaskService {
/**
* 查询所有:生成定时任务所需
*
* @author: caip
* @date: 2021-11-30 10:45:49
* @return
*/
List<ScheduledTask> listAll();
/**
* 根据完整类名分页模糊查询
*
* @author: caip
* @date: 2021-11-30 10:45:19
* @param taskClass 完整类名
* @return
*/
Result<?> pageList(String taskClass);
/**
* 根据id查询
*
* @author: caip
* @date: 2021-11-30 10:56:53
* @param id
* @return
*/
ScheduledTask getById(String id);
/**
* 执行
*
* @author: caip
* @date: 2021-11-30 14:15:11
* @param id 任务id
*/
Result<?> start(String id);
/**
* 停止
*
* @author: caip
* @date: 2021-12-01 16:09:47
* @param id
* @return
*/
Result<?> stop(String id);
/**
* 重启
*
* @author: caip
* @date: 2021-11-30 14:15:11
* @param id 任务id
*/
Result<?> restart(String id);
/**
* 更新
*
* @author: caip
* @date: 2021-11-30 14:25:59
* @param id
* @return
*/
Result<?> update(ScheduledTaskVO st);
/**
* 新增
*
* @author: caip
* @date: 2021-12-01 14:44:32
* @param st
* @return
*/
Result<?> insert(ScheduledTaskVO st);
/**
* 初始化定时任务
*
* @author: caip
* @date: 2021-12-01 15:51:43
*/
void initTask();
/**
* 分页查询执行器
*
* @author: caip
* @date: 2021-12-02 17:43:07
* @param keywords 关键字
* @return
*/
Result<?> pageListActuator(String keywords);
}
2.5 定时任务管理服务实现
属性说明:
taskBaseTaskList:执行器列表,写到配置文件中,如:
task:
baseTaskList: 生成excel:com.trasen.modules.chunjun.desreport.task.GenerateExcelTask
package com.xxx.modules.chunjun.desreport.service.impl;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSONObject;
import com.xxx.modules.chunjun.common.base.query.CriteriaQuery;
import com.xxx.modules.chunjun.common.base.service.BaseServiceImpl;
import com.xxx.modules.chunjun.common.util.LoginUserUtil;
import com.xxx.modules.chunjun.common.vo.Result;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport;
import com.xxx.modules.chunjun.desreport.mapper.ScheduledTaskMapper;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskActuator;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO;
import com.xxx.modules.chunjun.desreport.service.IScheduledTaskReportService;
import com.xxx.modules.chunjun.desreport.service.IScheduledTaskService;
import com.xxx.modules.chunjun.desreport.task.ScheduledOfTask;
import lombok.extern.slf4j.Slf4j;
/**
* xxx
*
* @author caip
* @date: 2021-11-29 14:58:08
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
@Slf4j
@Service
public class ScheduledTaskServiceImpl extends BaseServiceImpl<ScheduledTaskMapper, ScheduledTask>
implements IScheduledTaskService {
@Value("${task.baseTaskList}")
private String taskBaseTaskList;
/** 可重入锁 */
private ReentrantLock lock = new ReentrantLock();
/** 定时任务线程池 */
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Autowired
private IScheduledTaskReportService strService;
/** 启动状态的定时任务集合 */
@SuppressWarnings("rawtypes")
public Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<String, ScheduledFuture>();
@Override
public List<ScheduledTask> listAll() {
return this.list();
}
@Override
public Result<?> pageList(String taskClass) {
CriteriaQuery<ScheduledTask> cq = new CriteriaQuery<ScheduledTask>(this.request);
cq.like(null != taskClass && !taskClass.isEmpty(), ScheduledTask::getTaskClass, taskClass);
return this.pageList(cq);
}
@Override
public ScheduledTask getById(String id) {
return super.getById(id);
}
@Override
public Result<?> start(String id) {
// 根据id查询任务
ScheduledTask task = super.getById(id);
// 判断任务是否启用
if (1 != task.getStatus()) {
return Result.error("定时任务未启用,无法执行!");
}
String taskClass = task.getTaskClass();
log.info("启动定时任务:" + taskClass);
// 添加锁放一个线程启动,防止多人启动多次
lock.lock();
log.info("加锁完成");
try {
if (this.isStart(id)) {
String msg = "当前任务在启动状态中";
log.info(msg);
return Result.error(msg);
}
// 任务启动
this.doStartTask(task);
} finally {
lock.unlock();
log.info("解锁完毕");
}
return Result.OK();
}
@SuppressWarnings("rawtypes")
@Override
public Result<?> stop(String id) {
// 根据id查询任务
log.info("停止任务: " + id);
boolean flag = scheduledFutureMap.containsKey(id);
log.info("当前实例是否存在 " + flag);
if (flag) {
ScheduledFuture scheduledFuture = scheduledFutureMap.get(id);
scheduledFuture.cancel(true);
scheduledFutureMap.remove(id);
} else {
String msg = "当前任务不存在!";
log.info(msg + id);
Result.error(msg);
}
return Result.OK();
}
@Override
public Result<?> restart(String id) {
log.info("重启定时任务:" + id);
// 停止
this.stop(id);
// 启动
return this.start(id);
}
@Override
public Result<?> update(ScheduledTaskVO st) {
// 赋值更新人、更新时间
st.setUpdateBy(LoginUserUtil.getCurrentUserId());
st.setUpdateTime(new Date());
if (!super.updateById(st)) {
String msg = "新增失败!";
log.error(msg);
return Result.error(msg);
}
// 批量保存关联数据
JSONObject params = new JSONObject();
List<ScheduledTaskReport> strList = st.getStrList();
if (null != strList && !strList.isEmpty()) {
// 赋值任务id
for (ScheduledTaskReport str : strList) {
str.setTaskId(st.getId());
}
}
params.put("strList", strList);
params.put("ids", st.getIds());
if (!strService.saveBatch(params).isSuccess()) {
String msg = "批量保存失败!";
log.error(msg);
return Result.error(msg);
}
return Result.OK();
}
@Override
@Transactional
public Result<?> insert(ScheduledTaskVO st) {
// 赋值创建人、创建时间
st.setCreateBy(LoginUserUtil.getCurrentUserId());
st.setCreateTime(new Date());
// 新增定时任务
if (!super.save(st)) {
String msg = "新增失败!";
log.error(msg);
return Result.error(msg);
}
// 新增关联数据
JSONObject params = new JSONObject();
List<ScheduledTaskReport> strList = st.getStrList();
if (null != strList && !strList.isEmpty()) {
// 赋值任务id
for (ScheduledTaskReport str : strList) {
str.setTaskId(st.getId());
}
params.put("strList", strList);
if (!strService.saveBatch(params).isSuccess()) {
String msg = "批量新增失败!";
log.error(msg);
return Result.error(msg);
}
}
return Result.OK();
}
@Override
public void initTask() {
List<ScheduledTask> stList = super.list();
if (null != stList && !stList.isEmpty()) {
// 循环所有定时任务
for (ScheduledTask st : stList) {
// 判断是否启用
if (1 != st.getStatus()) {
continue;
}
// 执行任务
doStartTask(st);
}
}
}
/**
* 执行启动任务
*
* @author: caip
* @date: 2021-12-01 16:16:31
* @param scheduledTask 定时任务
*/
@SuppressWarnings(value = {"unchecked", "rawtypes"})
private void doStartTask(ScheduledTask scheduledTask) {
String taskClass = scheduledTask.getTaskClass();
log.info(taskClass);
if (1 != scheduledTask.getStatus()) {
return;
}
Class clazz;
ScheduledOfTask task;
try {
// 通过类全路径找到类
clazz = Class.forName(taskClass);
// 获取构造函数
Constructor<?> cons = clazz.getConstructor(ScheduledTask.class);
// 实例化对象,传入参数,用于执行器关联业务
task = (ScheduledOfTask)cons.newInstance(scheduledTask);
// 检查执行器是否继承父类,如果未继承,则直接中断
Assert.isAssignable(ScheduledOfTask.class, task.getClass(), "定时任务业务类必须继承ScheduledOfTask父类");
// 实例化任务触发器
CronTrigger cronTrigger = new CronTrigger(scheduledTask.getCronExpression());
// 实例化定时任务
ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(task,
(triggerContext -> cronTrigger.nextExecutionTime(triggerContext)));
// 存入定时任务,用于查找和停用
scheduledFutureMap.put(scheduledTask.getId(), scheduledFuture);
} catch (Exception e) {
log.error("定时任务【" + taskClass + "】启动异常。", e);
}
}
/**
* 检查任务是否已经启动
*
* @author: caip
* @date: 2021-12-01 16:00:17
* @param id 定时任务id
* @return
*/
private Boolean isStart(String id) {
// 校验是否已经启动
if (scheduledFutureMap.containsKey(id)) {
if (!scheduledFutureMap.get(id).isCancelled()) {
return true;
}
}
return false;
}
@Override
public Result<?> pageListActuator(String keywords) {
// 从配置文件读取并初始化执行器数据
String[] taskBaseTaskArray = taskBaseTaskList.split(",");
List<ScheduledTaskActuator> bList = new ArrayList<ScheduledTaskActuator>();
if (null != taskBaseTaskArray && taskBaseTaskArray.length > 0) {
for (int i = 0; i < taskBaseTaskArray.length; i++) {
String[] taskBaseTaskAttr = taskBaseTaskArray[i].split(":");
String tempName = taskBaseTaskAttr[0];
String tempClass = taskBaseTaskAttr[1];
boolean isAdd = false;
// 判断并添加条件
if (!StringUtils.isEmpty(keywords)) {
// 添加名称模糊查询条件
// 模糊查询 使用(Pattern、Matcher)
Pattern pattern1 = Pattern.compile(keywords);
Pattern pattern2 = Pattern.compile(keywords);
Matcher matcher1 = pattern1.matcher(tempName);
Matcher matcher2 = pattern2.matcher(tempClass);
if (matcher1.find() || matcher2.find()) {
isAdd = true;
}
} else {
isAdd = true;
}
if (isAdd) {
ScheduledTaskActuator temp = new ScheduledTaskActuator();
temp.setActuatorName(taskBaseTaskAttr[0]);
temp.setTaskClass(taskBaseTaskAttr[1]);
bList.add(temp);
}
}
}
return Result.OK(bList);
}
}
2.6 执行器父类
package com.xxx.modules.chunjun.desreport.task;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
/**
* 所有定时任务父类:定时任务业务类必须继承该类
*
* @author xxx
* @date: 2021-11-30 17:06:13
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
public class ScheduledOfTask implements Runnable {
/** 定时任务配置 */
private ScheduledTask scheduledTask;
/**
* 有参构造:用于赋值定时任务配置
*
* @param scheduledTask
*/
public ScheduledOfTask(ScheduledTask scheduledTask) {
this.scheduledTask = scheduledTask;
}
/**
* 获取定时任务配置
*
* @author: xxx
* @date: 2021-12-29 15:47:35
* @return
*/
public ScheduledTask getScheduledTask() {
return this.scheduledTask;
}
/**
* 实现Runnable类run,无具体意义。用于子类实现具体业务
*/
@Override
public void run() {
System.out.println(scheduledTask.getId() + ":开始执行!");
}
}
2.7 执行器业务类
package com.trasen.modules.chunjun.desreport.task;
import com.alibaba.fastjson.JSONObject;
import com.trasen.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.trasen.modules.chunjun.desreport.service.IReportExportExcel;
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
/**
* 定时任务业务类:生成excel任务
*
* @author xxx
* @date: 2021-11-29 17:04:35
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
@Slf4j
public class GenerateExcelTask extends ScheduledOfTask {
public GenerateExcelTask(ScheduledTask scheduledTask) {
super(scheduledTask);
}
@Override
public void run() {
ScheduledTask scheduledTask = super.getScheduledTask();
// 具体业务,无需关心
// String id = scheduledTask.getId();
// JSONObject jsonObject = new JSONObject();
// jsonObject.put("taskId", id);
// jsonObject.put("pageNo", 1);
// jsonObject.put("pageSize", 10000);
// IReportExportExcel reportExportExcel = SpringUtil.getBean(IReportExportExcel.class);
// reportExportExcel.generateExcelUploadMinIO(jsonObject);
log.info("任务id:{},任务说明:{},cron表达式:{}", id, scheduledTask.getTaskExplain(), scheduledTask.getCronExpression());
}
}
2.8 项目启动时初始化定时任务
package com.xxx.modules.chunjun.desreport.task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.trasen.modules.chunjun.desreport.service.IScheduledTaskService;
import lombok.extern.slf4j.Slf4j;
/**
* 项目启动时初始化定时任务
*
* @author xxx
* @date: 2021-12-01 16:22:15
* @Copyright: Copyright (c) 2006 - 2021
* @Company: xxx
* @Version: V1.0
*/
@Slf4j
@Component
public class ScheduledTaskRunner implements ApplicationRunner {
@Autowired
private IScheduledTaskService scheduledTaskService;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("----初始化定时任务开始----");
scheduledTaskService.initTask();
log.info("----初始化定时任务完成----");
}
}