【SpringBatch】04九张批处理表、作业控制:启动 停止 重启
目录标题
- 七、批处理数据表
- 7.1 batch_job_instance表
- 7.2 batch_job_execution表
- 7.3 batch_job_execution_context表
- 7.4 batch_job_execution_params表
- 7.5 btch_step_execution表
- 7.6 batch_step_execution_context表
- 7.7 H2内存数据库
- 八、作业控制
- 8.1 作业启动
- 8.1.1 SpringBoot 启动
- 8.1.2 Spring 单元测试启动
- 8.1.3 RESTful API 启动
- 8.2 作业停止
- 8.3 作业重启
- 8.3.1 禁止重启
- 8.3.2 限制重启次数
- 8.3.3 无限重启
七、批处理数据表
如果选择数据库方式存储批处理数据,Spring Batch 在启动时会自动创建9张表,分别存储: JobExecution、JobContext、JobParameters、JobInstance、JobExecution id序列、Job id序列、StepExecution、StepContext/ChunkContext、StepExecution id序列 等对象。Spring Batch 提供 JobRepository 组件来实现这些表的CRUD操作,并且这些操作基本上封装在步骤,块,作业api操作中,并不需要我们太多干预,所以这章内容了解即可。
7.1 batch_job_instance表
当作业第一次执行时,会根据作业名,标识参数生成一个唯一JobInstance对象,batch_job_instance表会记录一条信息代表这个作业实例。
**
字段 | 描述 |
---|---|
JOB_INSTANCE_ID | 作业实例主键 |
VERSION | 乐观锁控制的版本号 |
JOB_NAME | 作业名称 |
JOB_KEY | 作业名与标识性参数的哈希值,能唯一标识一个job实例 |
7.2 batch_job_execution表
每次启动作业时,都会创建一个JobExecution对象,代表一次作业执行,该对象记录存放于batch_job_execution 表。
**
字段 | 描述 |
---|---|
JOB_EXECUTION_ID | job执行对象主键 |
VERSION | 乐观锁控制的版本号 |
JOB_INSTANCE_ID | JobInstanceId(归属于哪个JobInstance) |
CREATE_TIME | 记录创建时间 |
START_TIME | 作业执行开始时间 |
END_TIME | 作业执行结束时间 |
STATUS | 作业执行的批处理状态 |
EXIT_CODE | 作业执行的退出码 |
EXIT_MESSAGE | 作业执行的退出信息 |
LAST_UPDATED | 最后一次更新记录的时间 |
7.3 batch_job_execution_context表
batch_job_execution_context用于保存JobContext对应的ExecutionContext对象数据。
**
字段 | 描述 |
---|---|
JOB_EXECUTION_ID | job执行对象主键 |
SHORT_CONTEXT | ExecutionContext系列化后字符串缩减版 |
SERIALIZED_CONTEXT | ExecutionContext系列化后字符串 |
7.4 batch_job_execution_params表
作业启动时使用标识性参数保存的位置:batch_job_execution_params, 一个参数一个记录
字段 | 描述 |
---|---|
JOB_EXECUTION_ID | job执行对象主键 |
TYPE_CODE | 标记参数类型 |
KEY_NAME | 参数名 |
STRING_VALUE | 当参数类型为String时有值 |
DATE_VALUE | 当参数类型为Date时有值 |
LONG_VAL | 当参数类型为LONG时有值 |
DOUBLE_VAL | 当参数类型为DOUBLE时有值 |
IDENTIFYING | 用于标记该参数是否为标识性参数 |
7.5 btch_step_execution表
作业启动,执行步骤,每个步骤执行信息保存在tch_step_execution表中
**
字段 | 描述 |
---|---|
STEP_EXECUTION_ID | 步骤执行对象id |
VERSION | 乐观锁控制版本号 |
STEP_NAME | 步骤名称 |
JOB_EXECUTION_ID | 作业执行对象id |
START_TIME | 步骤执行的开始时间 |
END_TIME | 步骤执行的结束时间 |
STATUS | 步骤批处理状态 |
COMMIT_COUNT | 在步骤执行中提交的事务次数 |
READ_COUNT | 读入的条目数量 |
FILTER_COUNT | 由于ItemProcessor返回null而过滤掉的条目数 |
WRITE_COUNT | 写入条目数量 |
READ_SKIP_COUNT | 由于ItemReader中抛出异常而跳过的条目数量 |
PROCESS_SKIP_COUNT | 由于ItemProcessor中抛出异常而跳过的条目数量 |
WRITE_SKIP_COUNT | 由于ItemWriter中抛出异常而跳过的条目数量 |
ROLLBACK_COUNT | 在步骤执行中被回滚的事务数量 |
EXIT_CODE | 步骤的退出码 |
EXT_MESSAGE | 步骤执行返回的信息 |
LAST_UPDATE | 最后一次更新记录时间 |
7.6 batch_step_execution_context表
StepContext对象对应的ExecutionContext 保存的数据表:batch_step_execution_context
**
字段 | 描述 |
---|---|
STEP_EXECUTION_ID | 步骤执行对象id |
SHORT_CONTEXT | ExecutionContext系列化后字符串缩减版 |
SERIALIZED_CONTEXT | ExecutionContext系列化后字符串 |
7.7 H2内存数据库
除了关系型数据库保存的数据外,Spring Batch 也执行内存数据库,比如H2,HSQLDB,这些数据库将数据缓存在内存中,当批处理结束后,数据会被清除,一般用于进行单元测试,不建议在生产环境中使用。
八、作业控制
作业的运行指的是对作业的控制,包括作业启动,作业停止,作业异常处理,作业重启处理等。
8.1 作业启动
8.1.1 SpringBoot 启动
目前为止,上面所有的案例都是使用Spring Boot 原生功能来启动作业的,其核心类:JobLauncherApplicationRunner , Spring Boot启动之后,马上调用该类run方法,然后将操作委托给SimpleJobLauncher类run方法执行。默认情况下,Spring Boot一启动马上执行作业。
如果不想Spring Boot启动就执行,可以通过配置进行修改
spring:
batch:
job:
enabled: false #false表示不启动
8.1.2 Spring 单元测试启动
开发中如果想简单验证批处理逻辑是否能运行,可以使用单元测试方式启动作业
先引入spring-test测试依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
建立启动类
@SpringBootApplication
@EnableBatchProcessing
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
建立测试类
package com.langfeiyes.batch._14_job_start_test;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = App.class)
public class StartJobTest {
//job调度器
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello SpringBatch....");
return RepeatStatus.FINISHED;
}
};
}
public Step step1(){
TaskletStep step1 = stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
return step1;
}
//定义作业
public Job job(){
Job job = jobBuilderFactory.get("start-test-job")
.start(step1())
.build();
return job;
}
@Test
public void testStart() throws Exception{
//job作业启动
//参数1:作业实例,参数2:作业运行携带参数
jobLauncher.run(job(), new JobParameters());
}
}
跟之前的SpringBoot启动区别在于多了JobLauncher 对象的获取,再由这个对象调用run方法启动。
8.1.3 RESTful API 启动
如果批处理不是SpringBoot启动就启动,而是通过web请求控制,那该怎么办呢?不难,引入web环境即可
1>首先限制,不随SpringBoot启动而启动
spring:
batch:
job:
enabled: false #false表示不启动
2>引入web 环境
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3>编写启动类
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
4>编写配置类
package com.langfeiyes.batch._15_job_start_restful;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableBatchProcessing
@Configuration
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello SpringBatch....");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
TaskletStep step1 = stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
return step1;
}
//定义作业
@Bean
public Job job(){
Job job = jobBuilderFactory.get("hello-restful-job")
.start(step1())
.build();
return job;
}
}
5>编写Controller类
package com.langfeiyes.batch._15_job_start_restful;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
import java.util.Properties;
@RestController
public class HelloController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@GetMapping("/job/start")
public ExitStatus start() throws Exception {
//启动job作业
JobExecution jobExet = launcher.run(job, jp);
return jobExet.getExitStatus();
}
}
6>测试
注意:如果需要接收参数
1>作业使用run.id自增
//构造一个job对象
@Bean
public Job job(){
return jobBuilderFactory.get("hello-restful-job")
.start(step1())
.incrementer(new RunIdIncrementer())
.build();
}
2>改动HelloController接口方法
@RestController
public class HelloController {
@Autowired
private JobLauncher launcher;
@Autowired
private Job job;
@Autowired
private JobExplorer jobExplorer; //job 展示对象
@GetMapping("/job/start")
public ExitStatus startJob(String name) throws Exception {
//启动job作业
JobParameters jp = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(job)
.addString("name", name)
.toJobParameters();
JobExecution jobExet = launcher.run(job, jp);
return jobExet.getExitStatus();
}
}
8.2 作业停止
作业的停止,存在有3种情况:
-
一种自然结束
作业成功执行,正常停止,此时作业返回状态为:COMPLETED
-
一种异常结束
作业执行过程因为各种意外导致作业中断而停止,大多数作业返回状态为:FAILED -
一种编程结束
某个步骤处理数据结果不满足下一步骤执行前提,手动让其停止,一般设置返回状态为:STOPED
上面1,2种情况相对简单,我们重点说下第三种:以编程方式让作业停止。
模拟一个操作场景
1>有一个资源类,里面有2个属性:总数:totalCount = 100, 读取数:readCount = 0
2>设计2个步骤,step1 用于叠加readCount 模拟从数据库中读取资源, step2 用于执行逻辑
3>当totalCount == readCount 时,为正常情况,正常结束。如果不等时,为异常状态。此时不执行step2,直接停止作业。
4>修复数据,在从step1开始执行,并完成作业
public class ResouceCount {
public static int totalCount = 100; //总数
public static int readCount = 0; //读取数
}
要实现上面需求,有2种方式可以实现
方案1:Step 步骤监听器方式
监听器
public class StopStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
//不满足
if(ResouceCount.totalCount != ResouceCount.readCount){
return ExitStatus.STOPPED; //手动停止,后续可以重启
}
return stepExecution.getExitStatus();
}
}
代码
package com.langfeiyes.batch._16_job_stop;
import com.langfeiyes.batch._01_hello.HelloJob;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class ListenerJobStopJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private int readCount = 50; //模拟只读取50个
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int i = 1; i <= readCount; i++) {
System.out.println("---------------step1执行-"+i+"------------------");
ResouceCount.readCount ++;
}
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("step2执行了.......");
System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
return RepeatStatus.FINISHED;
}
};
}
@Bean
public StopStepListener stopStepListener(){
return new StopStepListener();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.listener(stopStepListener())
.allowStartIfComplete(true) //执行完后,运行重启
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-stop-job")
.start(step1())
.on("STOPPED").stopAndRestart(step1())
.from(step1()).on("*").to(step2()).end()
.build();
}
public static void main(String[] args) {
SpringApplication.run(ListenerJobStopJob.class, args);
}
}
第一次执行:tasklet1 中readCount 默认执行50次,不满足条件, stopStepListener() afterStep 返回STOPPED, job进行条件控制走**.on(“STOPPED”).stopAndRestart(step1())** 分支,停止并允许重启–下次重启,从step1步骤开始执行
第二次执行, 修改readCount = 100, 再次启动作业,task1遍历100次,满足条件, stopStepListener() afterStep 正常返回,job条件控制走**.from(step1()).on(“*”).to(step2()).end()**分支,正常结束。
注意:step1() 方法中**.allowStartIfComplete(true)** 代码必须添加,因为第一次执行step1步骤,虽然不满足条件,但是它仍属于正常结束(正常执行完tasklet1的流程),状态码:COMPLETED, 第二次重启,默认情况下正常结束的step1步骤是不允许再执行的,所以必须设置:.allowStartIfComplete(true) 允许step1即使完成也可以重启。
方案2:StepExecution停止标记
package com.langfeiyes.batch._17_job_stop_sign;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class SignJobStopJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private int readCount = 50; //模拟只读取50个
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int i = 1; i <= readCount; i++) {
System.out.println("---------------step1执行-"+i+"------------------");
ResouceCount.readCount ++;
}
if(ResouceCount.readCount != ResouceCount.totalCount){
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("step2执行了.......");
System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.allowStartIfComplete(true)
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-stop-job")
.start(step1())
//.on("STOPPED").stopAndRestart(step1())
//.from(step1()).on("*").to(step2()).end()
.next(step2())
.build();
}
public static void main(String[] args) {
SpringApplication.run(SignJobStopJob.class, args);
}
}
变动的代码有2处
tasket1(), 多了下面判断
if(ResouceCount.readCount != ResouceCount.totalCount){
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
其中的StepExecution#setTerminateOnly() 给运行中的stepExecution设置停止标记,Spring Batch 识别后直接停止步骤,进而停止流程
job() 改动
return jobBuilderFactory.get("job-stop-job")
.start(step1())
.next(step2())
.build();
正常设置步骤流程。
8.3 作业重启
作业重启,表示允许作业步骤重新执行,默认情况下,只允许异常或终止状态的步骤重启,但有时存在特殊场景,要求需要其他状态步骤重启,为应付各种复杂的情形,Spring Batch 提供3种重启控制操作。
8.3.1 禁止重启
这种适用一次性执行场景,如果执行失败,就不允许再次执行。可以使用作业的禁止重启逻辑
package com.langfeiyes.batch._18_job_restart_forbid;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class JobForBidRestartJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet1-------------");
chunkContext.getStepContext().getStepExecution().setTerminateOnly(); //停止步骤
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet2-------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-forbid-restart-job")
.preventRestart() //禁止重启
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) {
SpringApplication.run(JobForBidRestartJob.class, args);
}
}
观察上面代码,比较特别之处:
tasklet1() 加了setTerminateOnly 设置,表示让步骤退出
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
job() 多了**.preventRestart()** 逻辑,表示步骤不允许重启
第一次按上面的代码执行一次, step1() 状态为 STOPPED
第二次去掉setTerminateOnly逻辑,重新启动步骤,观察结果,直接报错
**
8.3.2 限制重启次数
适用于重启次数有限的场景,比如下载/读取操作,可能因为网络原因导致下载/读取失败,运行重试几次,但是不能无限重试。这时可以对步骤执行进行重启次数限制。
package com.langfeiyes.batch._19_job_restart_limit;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class JobLimitRestartJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet1-------------");
chunkContext.getStepContext().getStepExecution().setTerminateOnly(); //停止步骤
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet2-------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.startLimit(2)
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-restart-limit-job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) {
SpringApplication.run(JobLimitRestartJob.class, args);
}
}
变动:
step1() 添加了**.startLimit(2)** 表示运行重启2次,注意,第一次启动也算一次
tasklet1() 设置setTerminateOnly 第一次先让step1 状态为STOPPED
第一次执行, step1 为 STOPPED 状态
第二次执行,不做任何操作,第二次执行,step1 还是STOPPED状态
第三次执行, 注释掉tasklet1() 中setTerminateOnly , 查询结果
**
8.3.3 无限重启
Spring Batch 限制同job名跟同标识参数作业只能成功执行一次,这是Spring Batch 定理,无法改变的。但是,对于步骤不一定适用,可以通过步骤的allowStartIfComplete(true) 实现步骤的无限重启。
package com.langfeiyes.batch._20_job_restart_allow;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class JobAllowRestartJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet1-------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet2-------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-allow-restart-job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) {
SpringApplication.run(JobAllowRestartJob.class, args);
}
}
观察上面代码,很正常逻辑
第一次启动:step1 step2正常执行,整个Job 成功执行完成
第二次启动:不做任何改动时,再次启动job,没有报错,但是观察数据库表batch_job_execution 状态为 NOOP 无效执行,step1 step2 不会执行。
第三次启动:给step1 step2 添加上**.allowStartIfComplete(true)**, 再次启动,一切正常,并且可以无限启动