springboot使用ThreadPoolTaskScheduler实现动态定时任务管理

2023-01-08 18:28:46

1. 需求说明

1.1 定时任务的动态管理;

1.2 单个执行器可配置多个cron表达式,及多次不同规则执行;

1.3 创建定时任务时,可将业务参数传入到执行器中进行业务关联。

        以上需求,在网上搜索,基本都是只实现了1.1,一个执行器对应一个cron表达式,千篇一律的代码复制、粘贴。又不想花太多的时间去了解xxl-job,因为一来项目上用不到定时任务分布式管理,二来也大概搜索过,没发现能完全满足以上需求的文章。所以,只能自己想办法解决了。

        实现思路:利用反射找到类及有参构造函数。

2. 实现过程

PS:代码内有相关业务类和代码,实际使用时删除,只需要关心定时任务的动态管理代码就行了

2.1 创建定时任务配置表和java对象

create table `cj_scheduled_task` (
  `id` varchar(50) primary key comment '主键id',
  `task_class` varchar(100) not null comment '定时任务完整类名',
  `cron_expression` varchar(20) not null comment 'cron表达式',
  `task_explain` varchar(200) default null comment '任务描述',
  `status` tinyint(1) not null comment '状态:1.启用;2.停用',
  `create_by` varchar(50) comment '创建人',
  `create_time` datetime comment '创建时间',
  `update_by` varchar(50) comment '修改人',
  `update_time` datetime comment '修改时间'
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COMMENT = '定时任务';
package com.xxx.modules.chunjun.desreport.entity.scheduled;// 包路径,需要自己修改,下同

import java.io.Serializable;
import java.util.Date;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

/**
 * 定时任务配置
 * 
 * @author xxx
 * @date: 2021-11-29 10:49:35
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
@TableName("cj_scheduled_task")
@Data
public class ScheduledTask implements Serializable {
    private static final long serialVersionUID = 1L;

    /** 主键id */
    @TableId(type = IdType.ASSIGN_UUID)
    @ApiModelProperty(value = "主键id")
    private String id;

    /** 定时任务名称 */
    @TableField("task_name")
    @ApiModelProperty(value = "定时任务名称")
    private String taskName;

    /** 定时任务完整类名 */
    @TableField("task_class")
    @ApiModelProperty(value = "定时任务完整类名")
    private String taskClass;

    /** cron表达式 */
    @TableField("cron_expression")
    @ApiModelProperty(value = "cron表达式")
    private String cronExpression;

    /** 任务描述 */
    @TableField("task_explain")
    @ApiModelProperty(value = "任务描述")
    private String taskExplain;

    /** 状态:1.启用;2.停用 */
    @TableField("status")
    @ApiModelProperty(value = "状态:1.启用;2.停用")
    private int status;

    /** 创建人. */
    @TableField("create_by")
    @ApiModelProperty(value = "创建人")
    private String createBy;

    /** 创建时间. */
    @TableField("create_time")
    @ApiModelProperty(value = "创建时间")
    private Date createTime;

    /** 修改人. */
    @TableField("update_by")
    @ApiModelProperty(value = "修改人")
    private String updateBy;

    /** 修改时间. */
    @TableField("update_time")
    @ApiModelProperty(value = "修改时间")
    private Date updateTime;
}
package com.xxx.modules.chunjun.desreport.model;

import java.io.Serializable;
import java.util.List;

import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport;

import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;

/**
 * 定时任务配置
 * 
 * @author xxx
 * @date: 2021-11-29 10:49:35
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
@Getter
@Setter
public class ScheduledTaskVO extends ScheduledTask implements Serializable {
    private static final long serialVersionUID = 1L;

    /** 任务报表关联集 */
    @ApiModelProperty(value = "任务报表关联集")
    private List<ScheduledTaskReport> strList;

    /** 批量删除id集 */
    @ApiModelProperty(value = "批量删除id集")
    private List<String> ids;
}

2.3 ThreadPoolTaskScheduler配置类

package com.xxx.modules.chunjun.config.scheduled;// 包路径,需要自己修改

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import lombok.extern.slf4j.Slf4j;

/**
 * 定时任务线程池配置类
 * 
 * @author xxx
 * @date: 2021-12-01 15:46:06
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
@Configuration
@Slf4j
public class ScheduledConfig {

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        log.info("创建定时任务调度线程池 start");
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(20);
        threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
        threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
        log.info("创建定时任务调度线程池 end");
        return threadPoolTaskScheduler;
    }
}

2.4 定时任务管理服务接口

package com.xxx.modules.chunjun.desreport.service;

import java.util.List;

import com.xxx.modules.chunjun.common.vo.Result;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO;

/**
 * 定时任务服务接口
 * 
 * @author xxx
 * @date: 2021-11-29 14:15:31
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
public interface IScheduledTaskService {

    /**
     * 查询所有:生成定时任务所需
     * 
     * @author: caip
     * @date: 2021-11-30 10:45:49
     * @return
     */
    List<ScheduledTask> listAll();

    /**
     * 根据完整类名分页模糊查询
     * 
     * @author: caip
     * @date: 2021-11-30 10:45:19
     * @param taskClass 完整类名
     * @return
     */
    Result<?> pageList(String taskClass);

    /**
     * 根据id查询
     * 
     * @author: caip
     * @date: 2021-11-30 10:56:53
     * @param id
     * @return
     */
    ScheduledTask getById(String id);

    /**
     * 执行
     * 
     * @author: caip
     * @date: 2021-11-30 14:15:11
     * @param id 任务id
     */
    Result<?> start(String id);

    /**
     * 停止
     * 
     * @author: caip
     * @date: 2021-12-01 16:09:47
     * @param id
     * @return
     */
    Result<?> stop(String id);

    /**
     * 重启
     * 
     * @author: caip
     * @date: 2021-11-30 14:15:11
     * @param id 任务id
     */
    Result<?> restart(String id);

    /**
     * 更新
     * 
     * @author: caip
     * @date: 2021-11-30 14:25:59
     * @param id
     * @return
     */
    Result<?> update(ScheduledTaskVO st);

    /**
     * 新增
     * 
     * @author: caip
     * @date: 2021-12-01 14:44:32
     * @param st
     * @return
     */
    Result<?> insert(ScheduledTaskVO st);

    /**
     * 初始化定时任务
     * 
     * @author: caip
     * @date: 2021-12-01 15:51:43
     */
    void initTask();

    /**
     * 分页查询执行器
     * 
     * @author: caip
     * @date: 2021-12-02 17:43:07
     * @param keywords 关键字
     * @return
     */
    Result<?> pageListActuator(String keywords);
}

2.5 定时任务管理服务实现

属性说明:

taskBaseTaskList:执行器列表,写到配置文件中,如:

task:
  baseTaskList: 生成excel:com.trasen.modules.chunjun.desreport.task.GenerateExcelTask
package com.xxx.modules.chunjun.desreport.service.impl;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.alibaba.fastjson.JSONObject;
import com.xxx.modules.chunjun.common.base.query.CriteriaQuery;
import com.xxx.modules.chunjun.common.base.service.BaseServiceImpl;
import com.xxx.modules.chunjun.common.util.LoginUserUtil;
import com.xxx.modules.chunjun.common.vo.Result;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport;
import com.xxx.modules.chunjun.desreport.mapper.ScheduledTaskMapper;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskActuator;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO;
import com.xxx.modules.chunjun.desreport.service.IScheduledTaskReportService;
import com.xxx.modules.chunjun.desreport.service.IScheduledTaskService;
import com.xxx.modules.chunjun.desreport.task.ScheduledOfTask;

import lombok.extern.slf4j.Slf4j;

/**
 * xxx
 * 
 * @author caip
 * @date: 2021-11-29 14:58:08
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
@Slf4j
@Service
public class ScheduledTaskServiceImpl extends BaseServiceImpl<ScheduledTaskMapper, ScheduledTask>
    implements IScheduledTaskService {

    @Value("${task.baseTaskList}")
    private String taskBaseTaskList;

    /** 可重入锁 */
    private ReentrantLock lock = new ReentrantLock();

    /** 定时任务线程池 */
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    @Autowired
    private IScheduledTaskReportService strService;

    /** 启动状态的定时任务集合 */
    @SuppressWarnings("rawtypes")
    public Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<String, ScheduledFuture>();

    @Override
    public List<ScheduledTask> listAll() {
        return this.list();
    }

    @Override
    public Result<?> pageList(String taskClass) {
        CriteriaQuery<ScheduledTask> cq = new CriteriaQuery<ScheduledTask>(this.request);
        cq.like(null != taskClass && !taskClass.isEmpty(), ScheduledTask::getTaskClass, taskClass);
        return this.pageList(cq);
    }

    @Override
    public ScheduledTask getById(String id) {
        return super.getById(id);
    }

    @Override
    public Result<?> start(String id) {
        // 根据id查询任务
        ScheduledTask task = super.getById(id);
        // 判断任务是否启用
        if (1 != task.getStatus()) {
            return Result.error("定时任务未启用,无法执行!");
        }
        String taskClass = task.getTaskClass();
        log.info("启动定时任务:" + taskClass);
        // 添加锁放一个线程启动,防止多人启动多次
        lock.lock();
        log.info("加锁完成");
        try {
            if (this.isStart(id)) {
                String msg = "当前任务在启动状态中";
                log.info(msg);
                return Result.error(msg);
            }
            // 任务启动
            this.doStartTask(task);
        } finally {
            lock.unlock();
            log.info("解锁完毕");
        }
        return Result.OK();
    }

    @SuppressWarnings("rawtypes")
    @Override
    public Result<?> stop(String id) {
        // 根据id查询任务
        log.info("停止任务: " + id);
        boolean flag = scheduledFutureMap.containsKey(id);
        log.info("当前实例是否存在 " + flag);
        if (flag) {
            ScheduledFuture scheduledFuture = scheduledFutureMap.get(id);
            scheduledFuture.cancel(true);
            scheduledFutureMap.remove(id);
        } else {
            String msg = "当前任务不存在!";
            log.info(msg + id);
            Result.error(msg);
        }
        return Result.OK();
    }

    @Override
    public Result<?> restart(String id) {
        log.info("重启定时任务:" + id);
        // 停止
        this.stop(id);
        // 启动
        return this.start(id);
    }

    @Override
    public Result<?> update(ScheduledTaskVO st) {
        // 赋值更新人、更新时间
        st.setUpdateBy(LoginUserUtil.getCurrentUserId());
        st.setUpdateTime(new Date());
        if (!super.updateById(st)) {
            String msg = "新增失败!";
            log.error(msg);
            return Result.error(msg);
        }
        // 批量保存关联数据
        JSONObject params = new JSONObject();
        List<ScheduledTaskReport> strList = st.getStrList();
        if (null != strList && !strList.isEmpty()) {
            // 赋值任务id
            for (ScheduledTaskReport str : strList) {
                str.setTaskId(st.getId());
            }
        }
        params.put("strList", strList);
        params.put("ids", st.getIds());
        if (!strService.saveBatch(params).isSuccess()) {
            String msg = "批量保存失败!";
            log.error(msg);
            return Result.error(msg);
        }
        return Result.OK();
    }

    @Override
    @Transactional
    public Result<?> insert(ScheduledTaskVO st) {
        // 赋值创建人、创建时间
        st.setCreateBy(LoginUserUtil.getCurrentUserId());
        st.setCreateTime(new Date());
        // 新增定时任务
        if (!super.save(st)) {
            String msg = "新增失败!";
            log.error(msg);
            return Result.error(msg);
        }
        // 新增关联数据
        JSONObject params = new JSONObject();
        List<ScheduledTaskReport> strList = st.getStrList();
        if (null != strList && !strList.isEmpty()) {
            // 赋值任务id
            for (ScheduledTaskReport str : strList) {
                str.setTaskId(st.getId());
            }
            params.put("strList", strList);
            if (!strService.saveBatch(params).isSuccess()) {
                String msg = "批量新增失败!";
                log.error(msg);
                return Result.error(msg);
            }
        }
        return Result.OK();
    }

    @Override
    public void initTask() {
        List<ScheduledTask> stList = super.list();
        if (null != stList && !stList.isEmpty()) {
            // 循环所有定时任务
            for (ScheduledTask st : stList) {
                // 判断是否启用
                if (1 != st.getStatus()) {
                    continue;
                }
                // 执行任务
                doStartTask(st);
            }
        }
    }

    /**
     * 执行启动任务
     * 
     * @author: caip
     * @date: 2021-12-01 16:16:31
     * @param scheduledTask 定时任务
     */
    @SuppressWarnings(value = {"unchecked", "rawtypes"})
    private void doStartTask(ScheduledTask scheduledTask) {
        String taskClass = scheduledTask.getTaskClass();
        log.info(taskClass);
        if (1 != scheduledTask.getStatus()) {
            return;
        }
        Class clazz;
        ScheduledOfTask task;
        try {
            // 通过类全路径找到类
            clazz = Class.forName(taskClass);
            // 获取构造函数
            Constructor<?> cons = clazz.getConstructor(ScheduledTask.class);
            // 实例化对象,传入参数,用于执行器关联业务
            task = (ScheduledOfTask)cons.newInstance(scheduledTask);
            // 检查执行器是否继承父类,如果未继承,则直接中断
            Assert.isAssignable(ScheduledOfTask.class, task.getClass(), "定时任务业务类必须继承ScheduledOfTask父类");
            // 实例化任务触发器
            CronTrigger cronTrigger = new CronTrigger(scheduledTask.getCronExpression());
            // 实例化定时任务
            ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(task,
                (triggerContext -> cronTrigger.nextExecutionTime(triggerContext)));
            // 存入定时任务,用于查找和停用
            scheduledFutureMap.put(scheduledTask.getId(), scheduledFuture);
        } catch (Exception e) {
            log.error("定时任务【" + taskClass + "】启动异常。", e);
        }
    }

    /**
     * 检查任务是否已经启动
     * 
     * @author: caip
     * @date: 2021-12-01 16:00:17
     * @param id 定时任务id
     * @return
     */
    private Boolean isStart(String id) {
        // 校验是否已经启动
        if (scheduledFutureMap.containsKey(id)) {
            if (!scheduledFutureMap.get(id).isCancelled()) {
                return true;
            }
        }
        return false;
    }

    @Override
    public Result<?> pageListActuator(String keywords) {
        // 从配置文件读取并初始化执行器数据
        String[] taskBaseTaskArray = taskBaseTaskList.split(",");
        List<ScheduledTaskActuator> bList = new ArrayList<ScheduledTaskActuator>();
        if (null != taskBaseTaskArray && taskBaseTaskArray.length > 0) {
            for (int i = 0; i < taskBaseTaskArray.length; i++) {
                String[] taskBaseTaskAttr = taskBaseTaskArray[i].split(":");
                String tempName = taskBaseTaskAttr[0];
                String tempClass = taskBaseTaskAttr[1];
                boolean isAdd = false;
                // 判断并添加条件
                if (!StringUtils.isEmpty(keywords)) {
                    // 添加名称模糊查询条件
                    // 模糊查询 使用(Pattern、Matcher)
                    Pattern pattern1 = Pattern.compile(keywords);
                    Pattern pattern2 = Pattern.compile(keywords);
                    Matcher matcher1 = pattern1.matcher(tempName);
                    Matcher matcher2 = pattern2.matcher(tempClass);
                    if (matcher1.find() || matcher2.find()) {
                        isAdd = true;
                    }
                } else {
                    isAdd = true;
                }

                if (isAdd) {
                    ScheduledTaskActuator temp = new ScheduledTaskActuator();
                    temp.setActuatorName(taskBaseTaskAttr[0]);
                    temp.setTaskClass(taskBaseTaskAttr[1]);
                    bList.add(temp);
                }
            }
        }

        return Result.OK(bList);
    }
}

2.6 执行器父类

package com.xxx.modules.chunjun.desreport.task;

import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;

/**
 * 所有定时任务父类:定时任务业务类必须继承该类
 * 
 * @author xxx
 * @date: 2021-11-30 17:06:13
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
public class ScheduledOfTask implements Runnable {

    /** 定时任务配置 */
    private ScheduledTask scheduledTask;

    /**
     * 有参构造:用于赋值定时任务配置
     * 
     * @param scheduledTask
     */
    public ScheduledOfTask(ScheduledTask scheduledTask) {
        this.scheduledTask = scheduledTask;
    }

    /**
     * 获取定时任务配置
     * 
     * @author: xxx
     * @date: 2021-12-29 15:47:35
     * @return
     */
    public ScheduledTask getScheduledTask() {
        return this.scheduledTask;
    }

    /**
     * 实现Runnable类run,无具体意义。用于子类实现具体业务
     */
    @Override
    public void run() {
        System.out.println(scheduledTask.getId() + ":开始执行!");
    }
}

2.7 执行器业务类

package com.trasen.modules.chunjun.desreport.task;

import com.alibaba.fastjson.JSONObject;
import com.trasen.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.trasen.modules.chunjun.desreport.service.IReportExportExcel;

import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * 定时任务业务类:生成excel任务
 * 
 * @author xxx
 * @date: 2021-11-29 17:04:35
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
@Slf4j
public class GenerateExcelTask extends ScheduledOfTask {
    public GenerateExcelTask(ScheduledTask scheduledTask) {
        super(scheduledTask);
    }

    @Override
    public void run() {
        ScheduledTask scheduledTask = super.getScheduledTask();
        // 具体业务,无需关心
        // String id = scheduledTask.getId();
        // JSONObject jsonObject = new JSONObject();
        // jsonObject.put("taskId", id);
        // jsonObject.put("pageNo", 1);
        // jsonObject.put("pageSize", 10000);
        // IReportExportExcel reportExportExcel = SpringUtil.getBean(IReportExportExcel.class);
        // reportExportExcel.generateExcelUploadMinIO(jsonObject);
        log.info("任务id:{},任务说明:{},cron表达式:{}", id, scheduledTask.getTaskExplain(), scheduledTask.getCronExpression());
    }
}

2.8 项目启动时初始化定时任务

package com.xxx.modules.chunjun.desreport.task;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.trasen.modules.chunjun.desreport.service.IScheduledTaskService;

import lombok.extern.slf4j.Slf4j;

/**
 * 项目启动时初始化定时任务
 * 
 * @author xxx
 * @date: 2021-12-01 16:22:15
 * @Copyright: Copyright (c) 2006 - 2021
 * @Company: xxx
 * @Version: V1.0
 */
@Slf4j
@Component
public class ScheduledTaskRunner implements ApplicationRunner {

    @Autowired
    private IScheduledTaskService scheduledTaskService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("----初始化定时任务开始----");
        scheduledTaskService.initTask();
        log.info("----初始化定时任务完成----");
    }
}
  • 作者:Java码仔潘
  • 原文链接:https://blog.csdn.net/aw277866304/article/details/122251111
    更新时间:2023-01-08 18:28:46