spring batch Job详解

2023-04-25 20:29:21

github地址:

https://github.com/a18792721831/studybatch.git

文章列表:

spring batch 入门

spring batch连接数据库

spring batch元数据

spring batch Job详解

spring batch step详解

spring batch ItemReader详解

spring batch itemProcess详解

spring batch itemWriter详解

spring batch 作业流

spring batch 健壮性

spring batch 扩展性

Job调度原理

一个Job由1个或者多个Step组成,Step有读写处理三部分组成;Job运行期间,所有的数据通过Job Repository进行持久化,同时通过Job Launcher负责调度Job作业。

image-20201109165536490

Job的基本配置

Job的核心属性:

image-20201109165640147

Job的组成:

image-20201109165705304

Job重启

通过设置restartable可以定义job是否可以重启。默认情况下Job是可以重启的,但是需要注意,即使配置了Job可以重启,仍然需要保证Job Instance的状态一定不为"COMPLETED".

不可重启Job

不可重启job的定义非常简单,只需要调用一个方法即可:

image-20201109185217758

    @Bean
    public Job noResJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        return jobBuilderFactory.get("job-4-no-restart")
                .incrementer(new RunIdIncrementer())
                .preventRestart()
                .flow(stepBuilderFactory.get("step-4-no-restart").tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("exec!" + new SimpleDateFormat("YYYY-MM-DD HH:MM:SS").format(new Date()));
                        return RepeatStatus.CONTINUABLE;
                    }
                }).build())
                .end()
                .build();
    }

这里创建了一个job,名字是job-4-no-restart,然后设置不可重启(默认是可以重启的)。这个Job非常的简单,只是不停的在打印exec!+时间。

是一个死循环。所以,需要手动停止。手动停止,在数据库中就不是"COMPLETED".

image-20201109184850812

image-20201109184830405

此时状态不是"COMPLETED",但是因为我们设置了不可重启,所以,在不修改任何代码的情况下,重新启动服务:

image-20201109185023074

就会抛出JobRestartException。

可重启Job

Job默认是可以重启的。

我们拷贝不可重启的bean,然后去掉设置不可重启的操作。

image-20201109185303763

记得在调度中启动

image-20201109185339279

第一次运行 还是死循环

image-20201109185408462

手动停止

image-20201109185503426

image-20201109185441387

然后重新启动

image-20201109185733935

虽然也异常了,但是,异常非常明显不一样,这是说有一个JobExecution已经在执行了。

我们现在修改job,不要让job是一个死循环,而是当循环次数大于10次的时候,抛出异常:

image-20201109192634158

然后,在启动的时候修改job名字,或者传入一个参数。

image-20201109192709983

启动在循环到第10次的时候,出现异常

image-20201109192738791

此时数据库中,记录的状态是FAILED.接着在不修改参数的前提下,注释掉抛出异常的代码。

image-20201109192949280

因为我们没有修改Job的名字,也没有修改Job的参数,所以,在spring batch看来,这就是同一个Job Instance。

image-20201109193306234

接着重新启动,重新运行这个Job Instance

image-20201109193253875

Job拦截器

spring batch框架在Job执行阶段提供了拦截器,使得在Job执行前后能够加入自定义的业务逻辑处理。

Job单个拦截器

Job 执行阶段拦截器需要实现接口:JobExecutionListener

@Configuration
public class JobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("JobListener before " + jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("JobListener after " + jobExecution.getExitStatus().getExitDescription());
    }
}

使用

@Configuration
@EnableBatchProcessing
public class LisJobConf {

    @Bean
    public String runJob(JobLauncher jobLauncher, Job lisJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        jobLauncher.run(lisJob, new JobParametersBuilder().addLong("id", 1L).toJobParameters());
        return "";
    }

    @Bean
    public Job lisJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, JobListener jobListener) {
        return jobBuilderFactory.get("study4-lis")
                .start(stepBuilderFactory.get("study4-step")
                        .tasklet(new Tasklet() {
                            @Override
                            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                                System.out.println("exec");
                                return RepeatStatus.FINISHED;
                            }
                        }).build())
                .listener(jobListener)
                .build();
    }

}

执行结果

image-20201109195042406

这里有个坑,Job的监听,只能实现接口,不能使用注解。

因为其他的监听,有Object参数的重载,而Job的监听Builder,没有Object的重载。

比如

@Component
public class AnnJobListener {

    @BeforeJob
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("before " + jobExecution.getJobInstance().getJobName());
    }

    @AfterJob
    public void afterJob(JobExecution jobExecution) {
        System.out.println("after " + jobExecution.getJobInstance().getJobName());
    }

}
@EnableBatchProcessing
@Configuration
public class LisAnnJobConf{

    @Bean
    public String runJob(JobLauncher jobLauncher,Job annJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        jobLauncher.run(annJob, new JobParameters());
        return "";
    }

    @Bean
    public Job annJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, AnnJobListener annJobListener) {
        return jobBuilderFactory.get("study4-anno")
                .start(stepBuilderFactory.get("study-anno")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("exec");
                        return RepeatStatus.FINISHED;
                    }
                })
                        .listener(annJobListener) // 有listerner(Object)的方法重载
                        .build())
                // .listener(annJobListener) 没有listener(Object)的方法重载
                .build();
    }

}

启动

image-20201109203534237

前后操作都没有执行。

Job组合拦截器

在Job中不尽可以配置单个的拦截器,还可以使用CompositeJobExecutionListener实现组合拦截器。

拦截器的顺序根据注册的先后顺序,进行拦截。

比如现在有3个拦截器

@Component
public class SecJobListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("SecJobListener before : " + jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("SecJobListener after : " + jobExecution.getJobInstance().getJobName());
    }
}
@Component
public class JobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("JobListener before " + jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("JobListener after " + jobExecution.getExitStatus().getExitDescription());
    }
}
@Component
public class AnnJobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("AnnJobListener before " + jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("AnnJobListener after " + jobExecution.getJobInstance().getJobName());
    }

}

接着,使用组合拦截器,将这三个拦截器配置给一个Job

@EnableBatchProcessing
@Configuration
public class MoreLisJobConf {

    @Bean
    public String jobRunner(JobLauncher jobLauncher,Job moreLisJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        jobLauncher.run(moreLisJob, new JobParametersBuilder().addLong("id", 1L).toJobParameters());
        return "";
    }

    @Bean
    public Job moreLisJob(AnnJobListener annJobListener, JobListener jobListener, SecJobListener secJobListener, JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        CompositeJobExecutionListener listener = new CompositeJobExecutionListener();
        listener.register(annJobListener);
        listener.register(jobListener);
        listener.register(secJobListener);
        return jobBuilderFactory.get("study4-more-listener")
                .start(stepBuilderFactory.get("study4-more-listener")
                        .tasklet(new Tasklet() {
                            @Override
                            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                                System.out.println("exec!" + LocalTime.now().toString());
                                return RepeatStatus.FINISHED;
                            }
                        }).build())
                .listener(listener)
                .build();
    }

}

组合拦截器,是基于拦截器做了封装。组合拦截器内有一个拦截器列表。同时组合拦截器也实现了JobExecutionListener接口,当spring batch调用组合拦截器的before方法时,内部实际是调用全部的注册的拦截器的before方法;当spring batch调用组合拦截器的after方法时,内部实际倒序调用全部注册的拦截器的after方法。

执行结果

image-20201110192447726

Job Parameters校验

spring batch框架提供了Job作业参数的校验功能,Job Parameters支持4种类型的参数:字符串、时间、长整型和双精度。但是传入的参数不一定符合这个规则,那么就需要对参数进行校验。除了类型校验,还可以有其他的校验,可以自己制定自定义的校验。需要实现接口JobParametersValidator

当然spring batch也提供了一些简单的校验类,可供我们使用

image-20201110193005900

自定义的Job Parameters校验

首先需要实现接口校验的接口

@Component
public class ParameValidatory implements
        JobParametersValidator {
    @Override
    public void validate(JobParameters parameters) throws JobParametersInvalidException {
        System.out.println(parameters.getParameters());
        System.out.println("ParameValidatory + " + parameters.getClass().getSimpleName());
    }
}

接着配置到Job上

@EnableBatchProcessing
@Configuration
public class ParameJobConf {


    @Bean
    public String runJob(JobLauncher jobLauncher,Job parJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        jobLauncher.run(parJob,new JobParametersBuilder().addLong("id", 2L).toJobParameters());
        return "";
    }

    @Bean
    public Job parJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, ParameValidatory parameValidatory) {
        return jobBuilderFactory.get("study4-parame")
                .validator(parameValidatory)
                .start(stepBuilderFactory.get("study4-parame")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("exec!"+ LocalTime.now());
                        return RepeatStatus.FINISHED;
                    }
                }).build()).build();
    }

}

启动

image-20201110194643254

为什么会被调用两次呢?

我们打个断点调试下

第一次调用在SimpleJobLauncher中的调用:

image-20201110195027760

第二次是在AbstractJob中调用的

image-20201110195108218

在JobLauncher真正运行的时候,会在执行线程地方启动后,调用Job的execut方法中调用AbstractJob中的validate方法。

默认的Job Parameters校验

spring batch框架默认提供了Job ParametersValidator的实现。

定义使用默认Job Parameters校验的Job

@Configuration
@EnableBatchProcessing
public class DefParJobConnf {

    @Bean
    public String runJob(JobLauncher jobLauncher,Job defParJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        jobLauncher.run(defParJob, new JobParametersBuilder().addLong("id", 2L).toJobParameters());
        return "";
    }

    @Bean
    public Job defParJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        return jobBuilderFactory.get("study4-def-par")
                .start(stepBuilderFactory.get("study4-def-par")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("exec ! " + LocalTime.now().toString());
                        return RepeatStatus.FINISHED;
                    }
                }).build())
                .validator(new DefaultJobParametersValidator(new String[]{"id"}, new String[]{"id"}))
                .build();
    }

}

正确运行了

image-20201110201025978

我们要求参数校验,id必填,name可选。

image-20201110201518394

接着将id传入空,name 传入非空

image-20201110201559779

验证通过

image-20201110201709782

如果一个参数都没有呢?

image-20201110202000259

抛出了Job Parameters验证异常,缺失必须的参数:id.

组合的Job Parameters校验

spring 框架还提供了组合校验器CompositeJobparametersValidator

@Configuration
@EnableBatchProcessing
public class CompParJobConf {

    @Bean
    public String runJob(JobLauncher jobLauncher, Job compParJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        jobLauncher.run(compParJob, new JobParametersBuilder().toJobParameters());
        return "";
    }

    @Bean
    public Job compParJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, ParameValidatory parameValidatory) {
        CompositeJobParametersValidator validator = new CompositeJobParametersValidator();
        validator.setValidators(Arrays.asList(parameValidatory, new DefaultJobParametersValidator(new String
  • 作者:a18792721831
  • 原文链接:https://jiayq.blog.csdn.net/article/details/109691602
    更新时间:2023-04-25 20:29:21