当前位置: 首页 > article >正文

【SpringBatch】04九张批处理表、作业控制:启动 停止 重启

目录标题

  • 七、批处理数据表
    • 7.1 batch_job_instance表
    • 7.2 batch_job_execution表
    • 7.3 batch_job_execution_context表
    • 7.4 batch_job_execution_params表
    • 7.5 btch_step_execution表
    • 7.6 batch_step_execution_context表
    • 7.7 H2内存数据库
  • 八、作业控制
    • 8.1 作业启动
      • 8.1.1 SpringBoot 启动
      • 8.1.2 Spring 单元测试启动
      • 8.1.3 RESTful API 启动
    • 8.2 作业停止
    • 8.3 作业重启
      • 8.3.1 禁止重启
      • 8.3.2 限制重启次数
      • 8.3.3 无限重启

七、批处理数据表

在这里插入图片描述

如果选择数据库方式存储批处理数据,Spring Batch 在启动时会自动创建9张表,分别存储: JobExecution、JobContext、JobParameters、JobInstance、JobExecution id序列、Job id序列、StepExecution、StepContext/ChunkContext、StepExecution id序列 等对象。Spring Batch 提供 JobRepository 组件来实现这些表的CRUD操作,并且这些操作基本上封装在步骤,块,作业api操作中,并不需要我们太多干预,所以这章内容了解即可。
在这里插入图片描述

7.1 batch_job_instance表

当作业第一次执行时,会根据作业名,标识参数生成一个唯一JobInstance对象,batch_job_instance表会记录一条信息代表这个作业实例。

**

字段描述
JOB_INSTANCE_ID作业实例主键
VERSION乐观锁控制的版本号
JOB_NAME作业名称
JOB_KEY作业名与标识性参数的哈希值,能唯一标识一个job实例

7.2 batch_job_execution表

每次启动作业时,都会创建一个JobExecution对象,代表一次作业执行,该对象记录存放于batch_job_execution 表。

**

字段描述
JOB_EXECUTION_IDjob执行对象主键
VERSION乐观锁控制的版本号
JOB_INSTANCE_IDJobInstanceId(归属于哪个JobInstance)
CREATE_TIME记录创建时间
START_TIME作业执行开始时间
END_TIME作业执行结束时间
STATUS作业执行的批处理状态
EXIT_CODE作业执行的退出码
EXIT_MESSAGE作业执行的退出信息
LAST_UPDATED最后一次更新记录的时间

7.3 batch_job_execution_context表

batch_job_execution_context用于保存JobContext对应的ExecutionContext对象数据。

**

字段描述
JOB_EXECUTION_IDjob执行对象主键
SHORT_CONTEXTExecutionContext系列化后字符串缩减版
SERIALIZED_CONTEXTExecutionContext系列化后字符串

7.4 batch_job_execution_params表

作业启动时使用标识性参数保存的位置:batch_job_execution_params, 一个参数一个记录

字段描述
JOB_EXECUTION_IDjob执行对象主键
TYPE_CODE标记参数类型
KEY_NAME参数名
STRING_VALUE当参数类型为String时有值
DATE_VALUE当参数类型为Date时有值
LONG_VAL当参数类型为LONG时有值
DOUBLE_VAL当参数类型为DOUBLE时有值
IDENTIFYING用于标记该参数是否为标识性参数

7.5 btch_step_execution表

作业启动,执行步骤,每个步骤执行信息保存在tch_step_execution表中

**

字段描述
STEP_EXECUTION_ID步骤执行对象id
VERSION乐观锁控制版本号
STEP_NAME步骤名称
JOB_EXECUTION_ID作业执行对象id
START_TIME步骤执行的开始时间
END_TIME步骤执行的结束时间
STATUS步骤批处理状态
COMMIT_COUNT在步骤执行中提交的事务次数
READ_COUNT读入的条目数量
FILTER_COUNT由于ItemProcessor返回null而过滤掉的条目数
WRITE_COUNT写入条目数量
READ_SKIP_COUNT由于ItemReader中抛出异常而跳过的条目数量
PROCESS_SKIP_COUNT由于ItemProcessor中抛出异常而跳过的条目数量
WRITE_SKIP_COUNT由于ItemWriter中抛出异常而跳过的条目数量
ROLLBACK_COUNT在步骤执行中被回滚的事务数量
EXIT_CODE步骤的退出码
EXT_MESSAGE步骤执行返回的信息
LAST_UPDATE最后一次更新记录时间

7.6 batch_step_execution_context表

StepContext对象对应的ExecutionContext 保存的数据表:batch_step_execution_context

**

字段描述
STEP_EXECUTION_ID步骤执行对象id
SHORT_CONTEXTExecutionContext系列化后字符串缩减版
SERIALIZED_CONTEXTExecutionContext系列化后字符串

7.7 H2内存数据库

除了关系型数据库保存的数据外,Spring Batch 也执行内存数据库,比如H2,HSQLDB,这些数据库将数据缓存在内存中,当批处理结束后,数据会被清除,一般用于进行单元测试,不建议在生产环境中使用。

八、作业控制

作业的运行指的是对作业的控制,包括作业启动,作业停止,作业异常处理,作业重启处理等。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

8.1 作业启动

在这里插入图片描述

8.1.1 SpringBoot 启动

目前为止,上面所有的案例都是使用Spring Boot 原生功能来启动作业的,其核心类:JobLauncherApplicationRunner , Spring Boot启动之后,马上调用该类run方法,然后将操作委托给SimpleJobLauncher类run方法执行。默认情况下,Spring Boot一启动马上执行作业。

如果不想Spring Boot启动就执行,可以通过配置进行修改

spring:
  batch:
    job:
      enabled: false   #false表示不启动

8.1.2 Spring 单元测试启动

开发中如果想简单验证批处理逻辑是否能运行,可以使用单元测试方式启动作业

先引入spring-test测试依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>

建立启动类

@SpringBootApplication
@EnableBatchProcessing
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

建立测试类

package com.langfeiyes.batch._14_job_start_test;

import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = App.class)
public class StartJobTest {
    //job调度器
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    public Tasklet tasklet(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello SpringBatch....");
                return RepeatStatus.FINISHED;
            }
        };
    }
    public Step  step1(){
        TaskletStep step1 = stepBuilderFactory.get("step1")
                .tasklet(tasklet())
                .build();
        return step1;
    }
    //定义作业
    public Job job(){
        Job job = jobBuilderFactory.get("start-test-job")
                .start(step1())
                .build();
        return job;
    }

    @Test
    public void testStart() throws Exception{
        //job作业启动
        //参数1:作业实例,参数2:作业运行携带参数
        jobLauncher.run(job(), new JobParameters());
    }
}

跟之前的SpringBoot启动区别在于多了JobLauncher 对象的获取,再由这个对象调用run方法启动。

8.1.3 RESTful API 启动

如果批处理不是SpringBoot启动就启动,而是通过web请求控制,那该怎么办呢?不难,引入web环境即可

1>首先限制,不随SpringBoot启动而启动

spring:
  batch:
    job:
      enabled: false   #false表示不启动

2>引入web 环境

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

3>编写启动类

@SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

4>编写配置类

package com.langfeiyes.batch._15_job_start_restful;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableBatchProcessing
@Configuration
public class BatchConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Tasklet tasklet(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello SpringBatch....");
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public Step step1(){
        TaskletStep step1 = stepBuilderFactory.get("step1")
                .tasklet(tasklet())
                .build();
        return step1;
    }
    //定义作业
    @Bean
    public Job job(){
        Job job = jobBuilderFactory.get("hello-restful-job")
                .start(step1())
                .build();
        return job;
    }
}

5>编写Controller类

package com.langfeiyes.batch._15_job_start_restful;

import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;
import java.util.Properties;

@RestController
public class HelloController {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job job;
    @GetMapping("/job/start")
    public ExitStatus start() throws Exception {
         //启动job作业
        JobExecution jobExet = launcher.run(job, jp);
        return jobExet.getExitStatus();
    }
}

6>测试

注意:如果需要接收参数

1>作业使用run.id自增

//构造一个job对象
@Bean
public Job job(){
    return jobBuilderFactory.get("hello-restful-job")
        .start(step1())
        .incrementer(new RunIdIncrementer())
        .build();
}

2>改动HelloController接口方法

@RestController
public class HelloController {
    @Autowired
    private JobLauncher launcher;
    @Autowired
    private Job job;
    @Autowired
    private JobExplorer jobExplorer;  //job 展示对象
    @GetMapping("/job/start")
    public ExitStatus startJob(String name) throws Exception {
        //启动job作业
        JobParameters jp = new JobParametersBuilder(jobExplorer)
                .getNextJobParameters(job)
                .addString("name", name)
                .toJobParameters();
        JobExecution jobExet = launcher.run(job, jp);
        return jobExet.getExitStatus();
    }
}

8.2 作业停止

作业的停止,存在有3种情况:

  • 一种自然结束

    作业成功执行,正常停止,此时作业返回状态为:COMPLETED

  • 一种异常结束
    作业执行过程因为各种意外导致作业中断而停止,大多数作业返回状态为:FAILED

  • 一种编程结束

某个步骤处理数据结果不满足下一步骤执行前提,手动让其停止,一般设置返回状态为:STOPED

上面1,2种情况相对简单,我们重点说下第三种:以编程方式让作业停止。

模拟一个操作场景

1>有一个资源类,里面有2个属性:总数:totalCount = 100, 读取数:readCount = 0

2>设计2个步骤,step1 用于叠加readCount 模拟从数据库中读取资源, step2 用于执行逻辑

3>当totalCount == readCount 时,为正常情况,正常结束。如果不等时,为异常状态。此时不执行step2,直接停止作业。

4>修复数据,在从step1开始执行,并完成作业

public class ResouceCount {
    public static int totalCount = 100;  //总数
    public static int  readCount = 0;    //读取数
}

要实现上面需求,有2种方式可以实现

在这里插入图片描述

方案1:Step 步骤监听器方式

监听器

public class StopStepListener implements StepExecutionListener {
    @Override
    public void beforeStep(StepExecution stepExecution) {
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {

       //不满足
        if(ResouceCount.totalCount != ResouceCount.readCount){
            return ExitStatus.STOPPED;  //手动停止,后续可以重启
        }
        return stepExecution.getExitStatus();
    }
}

代码

package com.langfeiyes.batch._16_job_stop;

import com.langfeiyes.batch._01_hello.HelloJob;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class ListenerJobStopJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private int readCount = 50; //模拟只读取50个
    @Bean
    public Tasklet tasklet1(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                for (int i = 1; i <= readCount; i++) {
                    System.out.println("---------------step1执行-"+i+"------------------");
                    ResouceCount.readCount ++;
                }
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public Tasklet tasklet2(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("step2执行了.......");
                System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public StopStepListener stopStepListener(){
        return new StopStepListener();
    }
    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .tasklet(tasklet1())
                .listener(stopStepListener())
                .allowStartIfComplete(true)  //执行完后,运行重启
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet(tasklet2())
                .build();
    }

    //定义作业
    @Bean
    public Job job(){
        return jobBuilderFactory.get("job-stop-job")
                .start(step1())
                .on("STOPPED").stopAndRestart(step1())
                .from(step1()).on("*").to(step2()).end()
                .build();

    }
    public static void main(String[] args) {
        SpringApplication.run(ListenerJobStopJob.class, args);
    }
}

第一次执行:tasklet1 中readCount 默认执行50次,不满足条件, stopStepListener() afterStep 返回STOPPED, job进行条件控制走**.on(“STOPPED”).stopAndRestart(step1())** 分支,停止并允许重启–下次重启,从step1步骤开始执行

第二次执行, 修改readCount = 100, 再次启动作业,task1遍历100次,满足条件, stopStepListener() afterStep 正常返回,job条件控制走**.from(step1()).on(“*”).to(step2()).end()**分支,正常结束。

注意:step1() 方法中**.allowStartIfComplete(true)** 代码必须添加,因为第一次执行step1步骤,虽然不满足条件,但是它仍属于正常结束(正常执行完tasklet1的流程),状态码:COMPLETED, 第二次重启,默认情况下正常结束的step1步骤是不允许再执行的,所以必须设置:.allowStartIfComplete(true) 允许step1即使完成也可以重启。

方案2:StepExecution停止标记

package com.langfeiyes.batch._17_job_stop_sign;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class SignJobStopJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private int readCount = 50; //模拟只读取50个
    @Bean
    public Tasklet tasklet1(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                for (int i = 1; i <= readCount; i++) {
                    System.out.println("---------------step1执行-"+i+"------------------");
                    ResouceCount.readCount ++;
                }

                if(ResouceCount.readCount != ResouceCount.totalCount){
                    chunkContext.getStepContext().getStepExecution().setTerminateOnly();
                }

                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public Tasklet tasklet2(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("step2执行了.......");
                System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
                return RepeatStatus.FINISHED;
            }
        };
    }


    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .tasklet(tasklet1())
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet(tasklet2())
                .build();
    }

    //定义作业
    @Bean
    public Job job(){
         return jobBuilderFactory.get("job-stop-job")
                .start(step1())
                //.on("STOPPED").stopAndRestart(step1())
                //.from(step1()).on("*").to(step2()).end()
                .next(step2())
                .build();

    }
    public static void main(String[] args) {
        SpringApplication.run(SignJobStopJob.class, args);
    }
}

变动的代码有2处

tasket1(), 多了下面判断

if(ResouceCount.readCount != ResouceCount.totalCount){
    chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}

其中的StepExecution#setTerminateOnly() 给运行中的stepExecution设置停止标记,Spring Batch 识别后直接停止步骤,进而停止流程

job() 改动

return jobBuilderFactory.get("job-stop-job")
    .start(step1())
    .next(step2())
    .build();

正常设置步骤流程。

8.3 作业重启

作业重启,表示允许作业步骤重新执行,默认情况下,只允许异常或终止状态的步骤重启,但有时存在特殊场景,要求需要其他状态步骤重启,为应付各种复杂的情形,Spring Batch 提供3种重启控制操作。
在这里插入图片描述

8.3.1 禁止重启

这种适用一次性执行场景,如果执行失败,就不允许再次执行。可以使用作业的禁止重启逻辑

package com.langfeiyes.batch._18_job_restart_forbid;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class JobForBidRestartJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Tasklet tasklet1(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("-------------tasklet1-------------");

                chunkContext.getStepContext().getStepExecution().setTerminateOnly(); //停止步骤
                return RepeatStatus.FINISHED;

            }
        };
    }

    @Bean
    public Tasklet tasklet2(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("-------------tasklet2-------------");
                return RepeatStatus.FINISHED;
            }
        };
    }


    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .tasklet(tasklet1())
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet(tasklet2())
                .build();
    }

    //定义作业
    @Bean
    public Job job(){
        return jobBuilderFactory.get("job-forbid-restart-job")
                .preventRestart()  //禁止重启
                .start(step1())
                .next(step2())
                .build();

    }
    public static void main(String[] args) {
        SpringApplication.run(JobForBidRestartJob.class, args);
    }
}

观察上面代码,比较特别之处:

tasklet1() 加了setTerminateOnly 设置,表示让步骤退出

 chunkContext.getStepContext().getStepExecution().setTerminateOnly();

job() 多了**.preventRestart()** 逻辑,表示步骤不允许重启

第一次按上面的代码执行一次, step1() 状态为 STOPPED

第二次去掉setTerminateOnly逻辑,重新启动步骤,观察结果,直接报错

**

8.3.2 限制重启次数

适用于重启次数有限的场景,比如下载/读取操作,可能因为网络原因导致下载/读取失败,运行重试几次,但是不能无限重试。这时可以对步骤执行进行重启次数限制。

package com.langfeiyes.batch._19_job_restart_limit;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class JobLimitRestartJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Tasklet tasklet1(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("-------------tasklet1-------------");

                chunkContext.getStepContext().getStepExecution().setTerminateOnly(); //停止步骤
                return RepeatStatus.FINISHED;

            }
        };
    }

    @Bean
    public Tasklet tasklet2(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("-------------tasklet2-------------");
                return RepeatStatus.FINISHED;
            }
        };
    }


    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .startLimit(2)
                .tasklet(tasklet1())
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet(tasklet2())
                .build();
    }

    //定义作业
    @Bean
    public Job job(){
        return jobBuilderFactory.get("job-restart-limit-job")
                .start(step1())
                .next(step2())
                .build();

    }
    public static void main(String[] args) {
        SpringApplication.run(JobLimitRestartJob.class, args);
    }
}

变动:

step1() 添加了**.startLimit(2)** 表示运行重启2次,注意,第一次启动也算一次

tasklet1() 设置setTerminateOnly 第一次先让step1 状态为STOPPED

第一次执行, step1 为 STOPPED 状态

第二次执行,不做任何操作,第二次执行,step1 还是STOPPED状态

第三次执行, 注释掉tasklet1() 中setTerminateOnly , 查询结果

**

8.3.3 无限重启

Spring Batch 限制同job名跟同标识参数作业只能成功执行一次,这是Spring Batch 定理,无法改变的。但是,对于步骤不一定适用,可以通过步骤的allowStartIfComplete(true) 实现步骤的无限重启。

package com.langfeiyes.batch._20_job_restart_allow;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class JobAllowRestartJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Tasklet tasklet1(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("-------------tasklet1-------------");
                return RepeatStatus.FINISHED;

            }
        };
    }

    @Bean
    public Tasklet tasklet2(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.err.println("-------------tasklet2-------------");
                return RepeatStatus.FINISHED;
            }
        };
    }


    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .tasklet(tasklet1())
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet(tasklet2())
                .build();
    }

    //定义作业
    @Bean
    public Job job(){
        return jobBuilderFactory.get("job-allow-restart-job")
                .start(step1())
                .next(step2())
                .build();

    }
    public static void main(String[] args) {
        SpringApplication.run(JobAllowRestartJob.class, args);
    }
}

观察上面代码,很正常逻辑

第一次启动:step1 step2正常执行,整个Job 成功执行完成

第二次启动:不做任何改动时,再次启动job,没有报错,但是观察数据库表batch_job_execution 状态为 NOOP 无效执行,step1 step2 不会执行。

第三次启动:给step1 step2 添加上**.allowStartIfComplete(true)**, 再次启动,一切正常,并且可以无限启动


http://www.kler.cn/a/595881.html

相关文章:

  • 可发1区的超级创新思路:基于注意力机制的DSD-CNN时间序列预测模型(功率预测、交通流量预测、故障检测)
  • Windows10抓包工具Wireshark下载、安装、使用
  • 10.PE导出表
  • Apache DolphinScheduler:一个可视化大数据工作流调度平台
  • 软考教材重点内容 信息安全工程师 第20章 数据库系统安全
  • 解决项目使用eslint+prettier,启动报错: error Delete `␍` prettier/prettier
  • Neo4j GDS-01-graph-data-science 图数据科学插件库概览
  • 试验一 mybatis 入门操作
  • 【算法】DFS、BFS、floodfill、记忆化搜索、BFS拓扑排序
  • Scratch游戏 | 《拍苍蝇》——Scratch厨房清洁大作战!
  • 数字化转型驱动卫生用品安全革新
  • 内网穿透的应用-如何用Docker本地部署轻量级个人云盘ZFile手机电脑异地远程访问
  • GLB文件介绍
  • Java 环境配置与 JAR 文件问题解决全攻略
  • 长列表局部渲染(监听window滚动),wndonw滚动同理
  • 注意力机制:让AI拥有黄金七秒记忆的魔法--(注意力机制中的Q、K、V)
  • 广度优先搜索(BFS)完全解析:从原理到 Java 实战
  • 分布式中间件:RabbitMQ确认消费机制
  • Ubuntu 22.04 上配置 ufw(Uncomplicated Firewall)防火墙的详细步骤
  • watch方法解析