elastic-job:动态进行任务的添加

2022-06-21 12:16:52

一:

上一篇https://blog.csdn.net/shn1994/article/details/93871695

中elastic-job参考官方整合了一下springboot,整合这个是为了进行接下来的任务的动态添加

,有的时候会遇到这样需求,同样的定时任务逻辑,需要动态生成不同的定时任务,而且需要整合到自身的代码中不能用控制台去控制

接着上一篇的项目继续。在上一篇中关注一下步骤四.②中的配置类

SimpleJobConfig中的

二:

我们主要是通过参考这个进行修改

主要是copy上面两个方法,动态添加类如下:

package com.threejie.demo.dynamic;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class AddJob {

    @Resource
    private ZookeeperRegistryCenter regCenter;

    /**
     *
     * @param jobName:任务名称,注意不要带#特殊字符。
     * @param simpleJob:实现simpleJob自己的任务类
     * @param cron
     * @param shardingTotalCount:分片总数
     * @param shardingItemParameters:个性化参数,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
     */
    public void addjobs(final String jobName ,final SimpleJob simpleJob, final String cron, final int shardingTotalCount,
                        final String shardingItemParameters){
        simpleJobScheduler(jobName,simpleJob,cron,shardingTotalCount,shardingItemParameters).init();
    }


    public JobScheduler simpleJobScheduler(final String jobName ,final SimpleJob simpleJob, final String cron, final int shardingTotalCount,
                                           final String shardingItemParameters) {
        LiteJobConfiguration liteJobConfiguration =  getLiteJobConfiguration(jobName ,simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters);
        return new SpringJobScheduler(simpleJob, regCenter,liteJobConfiguration /*, jobEventConfiguration*/);
    }

    private LiteJobConfiguration getLiteJobConfiguration(final String jobName ,final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {

        /**
         * JobCoreConfiguration.newBuilder(String jobName, String cron, int shardingTotalCount)
         * 其中jobName是任务名称
         * cron是cron表达式
         * shardingTotalCount是分片总数
         * 这样就可以把jobClass.getName()变成我们自己命名的jobName
         */
//        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
//                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();

        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
                jobName, cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build();


        JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());

        return LiteJobConfiguration.newBuilder(jobTypeConfiguration).overwrite(true).build();

       /* return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();*/
    }
}

说下修改过程注意的点:

①getLiteJobConfiguration这个方法多加个参数jobName,因为之前用的是jobClass.getName,可以去elasticJob控制台看下

②返回JobScheduler的添加任务的地方要注意调用init();

③参数jobName不要带#,因为在elastic-console中有get方法参数带#被截取

三:现在就可以通过AddJob类中的addjobs方法进行动态添加任务了(前提是任务逻辑实现类以及写好了)

写个controller测试一下

package com.threejie.demo.controller;

import com.threejie.demo.dynamic.AddJob;
import com.threejie.demo.jobs.DynamicJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class JobController {
    @Autowired
    private AddJob addJob;

    @RequestMapping("/addJob")
    public String addJob(@RequestParam String cron,
                         @RequestParam String jobName,
                         @RequestParam Integer shardingTotalCount,
                         @RequestParam String shardingItemParameters) {

        addJob.addjobs(jobName, new DynamicJob(), cron, shardingTotalCount, shardingItemParameters);
        return "0000";
    }

}

项目结构:这里就不需要SimpleJobConfig这个类了

注意:自动加载的定时任务项目,如果起多个相同的服务,在分片数大于1的前提下,会自动分配到不通的服务中。也就是分片会生效。

改造之后的能调用接口动态添加的定时任务项目,因为调用接口添加的定时任务,调用哪台,哪台会参与分片。

例子:分片数为3,服务两台;改造之前的两台服务上的项目启动之后,就会自动分片(其中一个执行一个,另一个执行两个),如图:

①自动加载的定时任务:

服务器一:

服务器二:

②动态添加的

通过其中一个服务添加定时任务,全在此服务中执行任务

如果想另外一台服务剩下,继续以相同参数调用另一台的服务接口,会分片成功

  • 作者:shn1994
  • 原文链接:https://blog.csdn.net/shn1994/article/details/93873479
    更新时间:2022-06-21 12:16:52