spring batch Job详解
github地址:
https://github.com/a18792721831/studybatch.git
文章列表:
Job调度原理
一个Job由1个或者多个Step组成,Step有读写处理三部分组成;Job运行期间,所有的数据通过Job Repository进行持久化,同时通过Job Launcher负责调度Job作业。
Job的基本配置
Job的核心属性:
Job的组成:
Job重启
通过设置restartable可以定义job是否可以重启。默认情况下Job是可以重启的,但是需要注意,即使配置了Job可以重启,仍然需要保证Job Instance的状态一定不为"COMPLETED".
不可重启Job
不可重启job的定义非常简单,只需要调用一个方法即可:
@Bean
public Job noResJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job-4-no-restart")
.incrementer(new RunIdIncrementer())
.preventRestart()
.flow(stepBuilderFactory.get("step-4-no-restart").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("exec!" + new SimpleDateFormat("YYYY-MM-DD HH:MM:SS").format(new Date()));
return RepeatStatus.CONTINUABLE;
}
}).build())
.end()
.build();
}
这里创建了一个job,名字是job-4-no-restart
,然后设置不可重启(默认是可以重启的)。这个Job非常的简单,只是不停的在打印exec!+时间。
是一个死循环。所以,需要手动停止。手动停止,在数据库中就不是"COMPLETED".
此时状态不是"COMPLETED",但是因为我们设置了不可重启,所以,在不修改任何代码的情况下,重新启动服务:
就会抛出JobRestartException。
可重启Job
Job默认是可以重启的。
我们拷贝不可重启的bean,然后去掉设置不可重启的操作。
记得在调度中启动
第一次运行 还是死循环
手动停止
然后重新启动
虽然也异常了,但是,异常非常明显不一样,这是说有一个JobExecution已经在执行了。
我们现在修改job,不要让job是一个死循环,而是当循环次数大于10次的时候,抛出异常:
然后,在启动的时候修改job名字,或者传入一个参数。
启动在循环到第10次的时候,出现异常
此时数据库中,记录的状态是FAILED.接着在不修改参数的前提下,注释掉抛出异常的代码。
因为我们没有修改Job的名字,也没有修改Job的参数,所以,在spring batch看来,这就是同一个Job Instance。
接着重新启动,重新运行这个Job Instance
Job拦截器
spring batch框架在Job执行阶段提供了拦截器,使得在Job执行前后能够加入自定义的业务逻辑处理。
Job单个拦截器
Job 执行阶段拦截器需要实现接口:JobExecutionListener
@Configuration
public class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("JobListener before " + jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("JobListener after " + jobExecution.getExitStatus().getExitDescription());
}
}
使用
@Configuration
@EnableBatchProcessing
public class LisJobConf {
@Bean
public String runJob(JobLauncher jobLauncher, Job lisJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(lisJob, new JobParametersBuilder().addLong("id", 1L).toJobParameters());
return "";
}
@Bean
public Job lisJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, JobListener jobListener) {
return jobBuilderFactory.get("study4-lis")
.start(stepBuilderFactory.get("study4-step")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("exec");
return RepeatStatus.FINISHED;
}
}).build())
.listener(jobListener)
.build();
}
}
执行结果
这里有个坑,Job的监听,只能实现接口,不能使用注解。
因为其他的监听,有Object参数的重载,而Job的监听Builder,没有Object的重载。
比如
@Component
public class AnnJobListener {
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
System.out.println("before " + jobExecution.getJobInstance().getJobName());
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
System.out.println("after " + jobExecution.getJobInstance().getJobName());
}
}
@EnableBatchProcessing
@Configuration
public class LisAnnJobConf{
@Bean
public String runJob(JobLauncher jobLauncher,Job annJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(annJob, new JobParameters());
return "";
}
@Bean
public Job annJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, AnnJobListener annJobListener) {
return jobBuilderFactory.get("study4-anno")
.start(stepBuilderFactory.get("study-anno")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("exec");
return RepeatStatus.FINISHED;
}
})
.listener(annJobListener) // 有listerner(Object)的方法重载
.build())
// .listener(annJobListener) 没有listener(Object)的方法重载
.build();
}
}
启动
前后操作都没有执行。
Job组合拦截器
在Job中不尽可以配置单个的拦截器,还可以使用CompositeJobExecutionListener
实现组合拦截器。
拦截器的顺序根据注册的先后顺序,进行拦截。
比如现在有3个拦截器
@Component
public class SecJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("SecJobListener before : " + jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("SecJobListener after : " + jobExecution.getJobInstance().getJobName());
}
}
@Component
public class JobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("JobListener before " + jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("JobListener after " + jobExecution.getExitStatus().getExitDescription());
}
}
@Component
public class AnnJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("AnnJobListener before " + jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("AnnJobListener after " + jobExecution.getJobInstance().getJobName());
}
}
接着,使用组合拦截器,将这三个拦截器配置给一个Job
@EnableBatchProcessing
@Configuration
public class MoreLisJobConf {
@Bean
public String jobRunner(JobLauncher jobLauncher,Job moreLisJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(moreLisJob, new JobParametersBuilder().addLong("id", 1L).toJobParameters());
return "";
}
@Bean
public Job moreLisJob(AnnJobListener annJobListener, JobListener jobListener, SecJobListener secJobListener, JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
CompositeJobExecutionListener listener = new CompositeJobExecutionListener();
listener.register(annJobListener);
listener.register(jobListener);
listener.register(secJobListener);
return jobBuilderFactory.get("study4-more-listener")
.start(stepBuilderFactory.get("study4-more-listener")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("exec!" + LocalTime.now().toString());
return RepeatStatus.FINISHED;
}
}).build())
.listener(listener)
.build();
}
}
组合拦截器,是基于拦截器做了封装。组合拦截器内有一个拦截器列表。同时组合拦截器也实现了JobExecutionListener接口,当spring batch调用组合拦截器的before方法时,内部实际是调用全部的注册的拦截器的before方法;当spring batch调用组合拦截器的after方法时,内部实际倒序调用全部注册的拦截器的after方法。
执行结果
Job Parameters校验
spring batch框架提供了Job作业参数的校验功能,Job Parameters支持4种类型的参数:字符串、时间、长整型和双精度。但是传入的参数不一定符合这个规则,那么就需要对参数进行校验。除了类型校验,还可以有其他的校验,可以自己制定自定义的校验。需要实现接口JobParametersValidator
。
当然spring batch也提供了一些简单的校验类,可供我们使用
自定义的Job Parameters校验
首先需要实现接口校验的接口
@Component
public class ParameValidatory implements
JobParametersValidator {
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
System.out.println(parameters.getParameters());
System.out.println("ParameValidatory + " + parameters.getClass().getSimpleName());
}
}
接着配置到Job上
@EnableBatchProcessing
@Configuration
public class ParameJobConf {
@Bean
public String runJob(JobLauncher jobLauncher,Job parJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(parJob,new JobParametersBuilder().addLong("id", 2L).toJobParameters());
return "";
}
@Bean
public Job parJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, ParameValidatory parameValidatory) {
return jobBuilderFactory.get("study4-parame")
.validator(parameValidatory)
.start(stepBuilderFactory.get("study4-parame")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("exec!"+ LocalTime.now());
return RepeatStatus.FINISHED;
}
}).build()).build();
}
}
启动
为什么会被调用两次呢?
我们打个断点调试下
第一次调用在SimpleJobLauncher中的调用:
第二次是在AbstractJob中调用的
在JobLauncher真正运行的时候,会在执行线程地方启动后,调用Job的execut方法中调用AbstractJob中的validate方法。
默认的Job Parameters校验
spring batch框架默认提供了Job ParametersValidator的实现。
定义使用默认Job Parameters校验的Job
@Configuration
@EnableBatchProcessing
public class DefParJobConnf {
@Bean
public String runJob(JobLauncher jobLauncher,Job defParJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(defParJob, new JobParametersBuilder().addLong("id", 2L).toJobParameters());
return "";
}
@Bean
public Job defParJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("study4-def-par")
.start(stepBuilderFactory.get("study4-def-par")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("exec ! " + LocalTime.now().toString());
return RepeatStatus.FINISHED;
}
}).build())
.validator(new DefaultJobParametersValidator(new String[]{"id"}, new String[]{"id"}))
.build();
}
}
正确运行了
我们要求参数校验,id必填,name可选。
接着将id传入空,name 传入非空
验证通过
如果一个参数都没有呢?
抛出了Job Parameters验证异常,缺失必须的参数:id.
组合的Job Parameters校验
spring 框架还提供了组合校验器CompositeJobparametersValidator
@Configuration
@EnableBatchProcessing
public class CompParJobConf {
@Bean
public String runJob(JobLauncher jobLauncher, Job compParJob) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
jobLauncher.run(compParJob, new JobParametersBuilder().toJobParameters());
return "";
}
@Bean
public Job compParJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, ParameValidatory parameValidatory) {
CompositeJobParametersValidator validator = new CompositeJobParametersValidator();
validator.setValidators(Arrays.asList(parameValidatory, new DefaultJobParametersValidator(new String