Elastic-Job创建动态定时任务、动态修改定时任务

2022-06-19 10:35:59

Elastic-Job实现动态增加定时任务、动态修改定时任务。

一、ElasticJob简介

ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。 它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。 它的各个产品使用统一的作业 API,开发者仅需一次开发,即可随意部署。

二、使用Elastic Job创建静态定时任务

2.1引入依赖包

<dependency><groupId>com.github.kuhn-he</groupId><artifactId>elastic-job-lite-spring-boot-starter</artifactId><version>2.1.5</version></dependency>

或者用

<dependency><groupId>com.github.xjzrc.spring.boot</groupId><artifactId>elastic-job-lite-spring-boot-starter</artifactId><version>1.0.1</version></dependency>

2.2添加配置信息

spring:
  elasticjob:
    zookeeper:      
      server-lists: 你自己zk的IP地址:2181
      namespace:daemon-job

2.3简单作业

实现 SimpleJob 接口。 该接口仅提供单一方法用于覆盖,此方法将定时执行。 与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.simple.SimpleJob;import com.zen.elasticjob.spring.boot.annotation.ElasticJobConfig;import lombok.extern.slf4j.Slf4j;/**
 * @author songfayuan
 * @date 2018/2/7
 * 测试Job
 */@Slf4j@ElasticJobConfig(cron="0 45 0 * * ? *", shardingTotalCount=3,
        shardingItemParameters="0=collection1,1=collection2,2=collection3",
        startedTimeoutMilliseconds=5000L,
        completedTimeoutMilliseconds=10000L,
        eventTraceRdbDataSource="dataSource",
        description="测试案例")publicclassSimpleJobimplementsSimpleJob{/**
     * 业务执行逻辑
     *
     * @param shardingContext 分片信息
     */@Overridepublicvoidexecute(ShardingContext shardingContext){
        log.info("shardingContext:{}", shardingContext);}}

2.4数据流作业

用于处理数据流,需实现 DataflowJob 接口。 该接口提供2个方法可供覆盖,分别用于抓取 (fetchData) 和处理 (processData) 数据。

import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.dataflow.DataflowJob;import com.zen.elasticjob.spring.boot.annotation.ElasticJobConfig;import java.util.List;/**
 * @author songfayuan
 * @date 2018/2/8
 */@ElasticJobConfig(cron="0 45 0 * * ? *", shardingTotalCount=3,
        shardingItemParameters="0=Beijing,1=Shanghai,2=Guangzhou",
        description="测试案例")publicclassAmazonDataflowJobimplementsDataflowJob<Integer>{@Overridepublic List<Integer>fetchData(ShardingContext shardingContext){return null;}@OverridepublicvoidprocessData(ShardingContext shardingContext, List<Integer> list){}}

三、使用Elastic Job创建动态定时任务

3.1创建存储动态任务的数据库表

如果没有这个本地持久化的数据,那么你的任务创建以后,如果Elastic Job服务挂掉,重启以后,就没有这个任务了…

CREATE TABLE `job_dynamic_task`(
  `id`bigint(20) NOT NULL AUTO_INCREMENT,
  `is_delete`int(4) NOT NULL DEFAULT'0' COMMENT'是否删除(0否 1是)',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT'创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT'更新时间',
  `job_name`varchar(200) NOT NULL DEFAULT'' COMMENT'任务名称',
  `cron`varchar(100) NOT NULL COMMENT'cron表达式',
  `description`varchar(255) NOT NULL DEFAULT'' COMMENT'作业描述',
  `parameters` text NOT NULL COMMENT'参数',
  `status`int(11) NOT NULL DEFAULT'-1' COMMENT'状态:0未执行 1已执行',
  PRIMARY KEY(`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='Elastic Job 自定义定时任务(动态任务)';

在这里插入图片描述

3.2初始化配置(DynamicElasticJobConfig)

import com.dangdang.ddframe.job.event.JobEventConfiguration;import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import com.github.amazon.daemon.utils.ElasticJobListener;import com.zaxxer.hikari.HikariDataSource;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;/**
 * Elastic Job 动态定时任务配置
 * @author songfayuan
 * @date 2021/4/26 9:23 上午
 */@ConfigurationpublicclassDynamicElasticJobConfig{@Value("${spring.elasticjob.zookeeper.server-lists}")private String serverLists;@Value("${spring.elasticjob.zookeeper.namespace}")private String namespace;@Resourceprivate HikariDataSource dataSource;@Beanpublic ZookeeperConfigurationzookeeperConfiguration(){returnnewZookeeperConfiguration(serverLists, namespace);}@Bean(initMethod="init")public ZookeeperRegistryCenterzookeeperRegistryCenter(ZookeeperConfiguration zookeeperConfiguration){returnnewZookeeperRegistryCenter(zookeeperConfiguration);}@Beanpublic ElasticJobListenerelasticJobListener(){returnnewElasticJobListener(100,100);}/**
     * 将作业运行的痕迹进行持久化到DB
     *
     * @return
     */@Beanpublic JobEventConfigurationjobEventConfiguration(){returnnewJobEventRdbConfiguration(dataSource);}}

3.3动态定时任务相关操作工具(ElasticJobHandler)

import com.dangdang.ddframe.job.api.simple.SimpleJob;import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;import com.dangdang.ddframe.job.event.JobEventConfiguration;import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import com.github.amazon.common.constant.enums.DingTokenEnum;import com.github.amazon.common.util.DingDingMsgSendUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.Resource;/**
 * 动态定时任务相关操作工具
 * @author songfayuan
 * @date 2021/4/26 10:54 上午
 */@Slf4j@ComponentpublicclassElasticJobHandler{@Autowiredprivate ZookeeperRegistryCenter zookeeperRegistryCenter;@Resourceprivate JobEventConfiguration jobEventConfiguration;@Resourceprivate ElasticJobListener elasticJobListener;/***
     * 动态创建定时任务
     * @param jobName:定时任务名称
     * @param cron:表达式
     * @param shardingTotalCount:分片数量
     * @param instance:定时任务实例
     * @param parameters:参数
     * @param description:作业描述
     */publicvoidaddJob(String jobName, String cron,int shardingTotalCount, SimpleJob instance, String parameters, String description){
        log.info("动态创建定时任务:jobName = {}, cron = {}, shardingTotalCount = {}, parameters = {}", jobName, cron, shardingTotalCount, parameters);
        DingDingMsgSendUtils.sendDingDingGroupMsgNoAt(DingTokenEnum.DYNAMIC_SCHEDULED_TASK_NOTIFICATION.getToken(),"正在动态创建定时任务:jobName = "+jobName+",cron = "+cron+",parameters = "+parameters+",shardingTotalCount = "+shardingTotalCount);

        LiteJobConfiguration.Builder builder= LiteJobConfiguration.newBuilder(newSimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(
                        jobName,
                        cron,
                        shardingTotalCount).jobParameter(parameters).description(description).build(),
                instance.getClass().getName())).overwrite(true);
        LiteJobConfiguration liteJobConfiguration= builder.build();newSpringJobScheduler(instance,zookeeperRegistryCenter,liteJobConfiguration,jobEventConfiguration,elasticJobListener).init();}/**
     * 更新定时任务
     * @param jobName
     * @param cron
     */publicvoidupdateJob(String jobName, String cron){
        log.info("更新定时任务:jobName = {}, cron = {}", jobName, cron);
        DingDingMsgSendUtils.sendDingDingGroupMsgNoAt(DingTokenEnum.DYNAMIC_SCHEDULED_TASK_NOTIFICATION.getToken(),"正在更新定时任务:jobName = "+jobName+",cron = "+cron);
        JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(cron);}/**
     * 删除定时任务
     * @param jobName
     */publicvoidremoveJob(String jobName){
        log.info("删除定时任务:jobName = {}", jobName);
        DingDingMsgSendUtils.sendDingDingGroupMsgNoAt(DingTokenEnum.DYNAMIC_SCHEDULED_TASK_NOTIFICATION.getToken(),"正在删除定时任务:jobName = "+jobName);
        JobRegistry.getInstance().getJobScheduleController(jobName).shutdown();}}

3.4配置ElasticJobListener监听器(ElasticJobListener)

package com.github.amazon.daemon.utils;import com.dangdang.ddframe.job.executor.ShardingContexts;import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;/**
 * ElasticJobListener 监听器
 *
 * 实现分布式任务监听器
 * 如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次
 *
 * @author songfayuan
 * @date 2021/4/26 2:28 下午
 */publicclassElasticJobListenerextendsAbstractDistributeOnceElasticJobListener{publicElasticJobListener(long startedTimeoutMilliseconds,long completedTimeoutMilliseconds){super(startedTimeoutMilliseconds,completedTimeoutMilliseconds);}@OverridepublicvoiddoBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts){}@OverridepublicvoiddoAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts){//任务执行完成后更新状态为已执行,当前未处理}}

3.5动态任务执行类(DynamicJob)

import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.simple.SimpleJob;import com.github.amazon.common.constant.enums.DingTokenEnum;import com.github.amazon.common.util.DingDingMsgSendUtils;import com.xiaoleilu.hutool.date.DatePattern;import com.xiaoleilu.hutool.date.DateUtil;import lombok.extern.slf4j.Slf4j;import java.util.Date;/**
 * 动态定时任务执行
 * @author songfayuan
 * @date 2021/4/26 10:56 上午
 */@Slf4jpublicclassDynamicJobimplementsSimpleJob{/**
     * 业务执行逻辑
     *
     * @param shardingContext
     */@Overridepublicvoidexecute(ShardingContext shardingContext){
        log.info("{}动态定时任务执行逻辑start...", DateUtil.format(newDate(), DatePattern.NORM_DATETIME_MS_PATTERN));
        String jobName= shardingContext.getJobName();
        String jobParameter= shardingContext.getJobParameter();
        log.info("---------DynamicJob---------动态定时任务正在执行:jobName = {}, jobParameter = {}", jobName, jobParameter);
        DingDingMsgSendUtils.sendDingDingGroupMsgNoAt(DingTokenEnum.DYNAMIC_SCHEDULED_TASK_NOTIFICATION.getToken(),"动态定时任务正在执行:JobName = "+jobName+",jobParameter = "+jobParameter);//根据参数调用不同的业务接口处理,请远程调用业务模块处理,避免本服务与业务依赖过重...

        log.info("{}动态定时任务执行逻辑end...", DateUtil.format(newDate(), DatePattern.NORM_DATETIME_MS_PATTERN));}}

3.6新增controller测试动态创建动态定时任务(JobOperateController)

import com.github.amazon.common.util.Response;import com.github.amazon.daemon.job.DynamicJob;import com.github.amazon.daemon.utils.ElasticJobHandler;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Map;import java.util.Objects;/**
 * <p>
 * 动态定时任务 前端控制器
 * </p>
 *
 * @author songfayuan
 * @date 2021/4/26 11:05 上午
 */@RestController@RequestMapping("/noAuthorization")publicclassJobOperateController{@Autowiredprivate ElasticJobHandler elasticJobHandler;/**
     * 创建动态定时任务
     * jobName 任务名称
     * cron cron表达式 0 * * * * ? *
     * @param params
     * @return
     */@GetMapping("/createJob")public ResponsecreateJob(@RequestBody Map<String, Object> params){if(Objects.isNull(params.get("jobName"))){return Response.errorResponse("jobName不能为空");}if(Objects.isNull(params.get("cron"))){return Response.errorResponse("cron不能为空");}
        elasticJobHandler.addJob(params.get("jobName").toString(), params.get("cron").toString(),1,newDynamicJob(), Objects.isNull(params.get("params"))?"": params.get("params").toString(), Objects.isNull(params.get("description"))?"": params.get("description").toString());return Response.successResponse("请求成功");}/**
     * 更新定时任务(似乎,好像,他内内的这个方法没用!!!)
     * jobName
     * cron cron表达式 0 0/5 * * * ?
     * @return
     */@GetMapping("/updateJob")public ResponseupdateJob(@RequestBody Map<String, Object> params){if(Objects.isNull(params.get("jobName"))){return Response.errorResponse("jobName不能为空");}if(Objects.isNull(params.get("cron"))){return Response.errorResponse("cron不能为空");}
        elasticJobHandler.updateJob(params.get("jobName").toString(), params.get("cron").toString());return Response.successResponse("请求成功");}/**
     * 删除定时任务
     * jobName 任务名称
     * @return
     */@GetMapping("/removeJob")public ResponseremoveJob(@RequestBody Map<String, Object> params){
  • 作者:宋发元
  • 原文链接:https://songfayuan.blog.csdn.net/article/details/116200025
    更新时间:2022-06-19 10:35:59