SpringBatch之实际操作
文章目录
- 1 SpringBatch操作
- 1.1 SpringBatch介绍
- 1.2 依赖配置相关
- 1.2.1 pom.xml
- 1.2.2 mysql 依赖库表
- 1.2.3 启动配置
- 1.2.4 数据库配置
- 1.3 示例Demo
- 1.3.1 简单执行
- 1.3.2 报错
- 1.4 流程控制
- 1.4.1 多步骤任务
- 1.4.2 Flow用法
- 1.4.3 并发执行
- 1.4.4 任务决策
- 1.4.5 任务嵌套
- 1.5 数据操作
- 1.5.1 读取数据
- 1.5.2 输出数据
- 1.5.3 处理数据
- 1.6 任务调度
1 SpringBatch操作
1.1 SpringBatch介绍
SpringBatch
是一个轻量级、全面的批处理框架,旨在支持开发对企业系统的日常操作至关重要的健壮的批处理应用程序。
在大型企业中,由于业务复杂、数据量大、数据格式不同、数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理。而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理,这样的过程就是批处理
。
典型的批处理流程是 读数据
、处理数据
、写数据
的三步式架构——从数据库、文件或队列中读取大量数据,然后通过业务规则处理数据,最后将处理完的数据按需求方式写(数据库、文件等)。
SpringBatch
可以提供大量的,可重复的数据处理功能,包括日志记录、跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能
通常 SpringBatch
工作在离线模式下,不需要用户干预、就能自动进行基本的批处理迭代,进行类似事务方式的处理。
注意
:SpringBatch
不是一个调度框架。在商业和开源领域都有许多优秀的企业调度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch
旨在与调度程序结合使用,而不是替代调度程序
1.2 依赖配置相关
1.2.1 pom.xml
<!-- pom 文件引入 springboot -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!-- pom 文件引入 spring-batch 及相关依赖 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
</dependencies>
注意
:只是为了测试所以每次启动后springboot就会自动关闭,要想不关闭,就引入web容器依赖,如下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
1.2.2 mysql 依赖库表
sql 脚本的 jar 包路径:…\maven\repository\org\springframework\batch\spring-batch-core\4.2.1.RELEASE\spring-batch-core-4.2.1.RELEASE.jar!\org\springframework\batch\core\schema-mysql.sql
1.2.3 启动配置
启动类添加标志@EnableBatchProcessing
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchStartApplication
{
public static void main(String[] args) {
SpringApplication.run(SpringBatchStartApplication.class, args);
}
}
1.2.4 数据库配置
spring:
application:
name: spring-batch
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: test
password: test
url: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8&&allowMultiQueries=true
type: com.alibaba.druid.pool.DruidDataSource
batch:
jdbc:
table-prefix: BATCH_
1.3 示例Demo
1.3.1 简单执行
@Component
public class FirstJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job firstJob() {
return jobBuilderFactory.get("firstJob")
.start(step())
.build();
}
private Step step() {
return stepBuilderFactory.get("step")
.tasklet((contribution, chunkContext) -> {
System.out.println("执行步骤....");
return RepeatStatus.FINISHED;
}).build();
}
}
我们注入了 JobBuilderFactory
任务创建工厂和 StepBuilderFactory
步骤创建工厂,分别用于创建任务Job
和步骤Step
。
JobBuilderFactory
的 get
方法用于创建一个指定名称的任务,start
方法指定任务的开始步骤,步骤通过 StepBuilderFactory
构建。
步骤 Step
由若干个小任务 Tasklet
组成,所以我们通过tasklet
方法创建。tasklet
方法接收一个Tasklet
类型参数,Tasklet
是一个函数式接口
所以我们可以使用 lambda
表达式创建一个匿名实现:
(contribution, chunkContext) -> {
System.out.println("执行步骤....");
return RepeatStatus.FINISHED;
}
该匿名实现必须返回一个明确的执行状态,这里返回RepeatStatus.FINISHED
表示该小任务执行成功,正常结束。
此外,需要注意的是,我们配置的任务Job
必须注册到Spring IOC
容器中,并且任务的名称和步骤的名称组成唯一。比如上面的例子,我们的任务名称为firstJob,步骤的名称为step,如果存在别的任务和步骤组合也叫这个名称的话,则会执行失败
1.3.2 报错
springbatch
报错Duplicate entry '0' for key 'PRIMARY'
org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; SQL [INSERT into BATCH_JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION) values (?, ?, ?, ?)]; Duplicate entry ‘0’ for key ‘PRIMARY’; nested exception is com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException:
Duplicate entry ‘0’ for key ‘PRIMARY’
原因:以_seq
结尾的表中不能为空有初始化内容
1.4 流程控制
1.4.1 多步骤任务
简单多步骤任务执行
public Job multiStepJob() {
return jobBuilderFactory.get("multiStepJob")
.start(step1())
.next(step2())
.next(step3())
.build();
}
多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤,修改上面的代码
@Bean
public Job multiStepJob() {
return jobBuilderFactory.get("multiStepJob2")
.start(step1())
.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
.from(step2())
.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
.from(step3()).end()
.build();
}
from....on....to....
的方式中:
from
:表示多个step
时,从哪一个step
往下走on
:一个条件,是前面step
执行成功的条件,只有满足了才执行下一个to
:下一个要执行的是哪一个step
步骤代码
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
1.4.2 Flow用法
Flow
的作用就是可以将多个步骤 Step
组合在一起然后再组装到任务 Job
中
@Component
public class FlowJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job flowJob() {
return jobBuilderFactory.get("flowJob")
.start(flow())
.next(step3())
.end()
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
// 创建一个flow对象,包含若干个step
private Flow flow() {
return new FlowBuilder<Flow>("flow")
.start(step1())
.next(step2())
.build();
}
}
1.4.3 并发执行
任务中的步骤除了可以串行执行(一个接着一个执行)外,还可以并行执行,并行执行在特定的业务需求下可以提供任务执行效率。
将任务并行化只需两个简单步骤:
- 将步骤
Step
转换为Flow
; - 任务
Job
中指定并行Flow
创建了两个 Flow
:flow1
(包含 step1 和 step2)和 flow2
(包含 step3)。然后通过JobBuilderFactory
的split方法,指定一个异步执行器,将 flow1 和 flow2 异步执行(也就是并行)
@Component
public class SplitJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job splitJob() {
return jobBuilderFactory.get("splitJob")
.start(flow1())
.split(new SimpleAsyncTaskExecutor()).add(flow2())
.end()
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Flow flow1() {
return new FlowBuilder<Flow>("flow1")
.start(step1())
.next(step2())
.build();
}
private Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(step3())
.build();
}
}
注意
: 开启并行化后,并行的步骤执行顺序并不能100%确定,因为线程调度具有不确定性。
1.4.4 任务决策
决策器
的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则执行step1 和 step3
@Component
public class MyDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
LocalDate now = LocalDate.now();
DayOfWeek dayOfWeek = now.getDayOfWeek();
if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
return new FlowExecutionStatus("weekend");
} else {
return new FlowExecutionStatus("workingDay");
}
}
}
@Autowired
private MyDecider myDecider;
@Bean
public Job deciderJob() {
return jobBuilderFactory.get("deciderJob")
.start(step1())
.next(myDecider)
.from(myDecider).on("weekend").to(step2())
.from(myDecider).on("workingDay").to(step3())
.from(step3()).on("*").to(step4())
.end()
.build();
}
private Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step step4() {
return stepBuilderFactory.get("step4")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("执行步骤四操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
1.4.5 任务嵌套
任务 Job
除了可以由 Step
或者 Flow
构成外,我们还可以将多个任务 Job
转换为特殊的 Step
,然后再赋给另一个任务 Job
,这就是任务的嵌套。
@Component
public class NestedJobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager platformTransactionManager;
// 父任务
@Bean
public Job parentJob() {
return jobBuilderFactory.get("parentJob")
.start(childJobOneStep())
.next(childJobTwoStep())
.build();
}
// 将任务转换为特殊的步骤
private Step childJobOneStep() {
return new JobStepBuilder(new StepBuilder("childJobOneStep"))
.job(childJobOne())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
// 将任务转换为特殊的步骤
private Step childJobTwoStep() {
return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
.job(childJobTwo())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
// 子任务一
private Job childJobOne() {
return jobBuilderFactory.get("childJobOne")
.start(
stepBuilderFactory.get("childJobOneStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务一执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
// 子任务二
private Job childJobTwo() {
return jobBuilderFactory.get("childJobTwo")
.start(
stepBuilderFactory.get("childJobTwoStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任务二执行步骤。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
}
1.5 数据操作
1.5.1 读取数据
@Data
public class TestData {
private int id;
private String field1;
private String field2;
private String field3;
}
文本数据读取 Demo
@Component
public class FileItemReaderDemo {
// 任务创建工厂
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 步骤创建工厂
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job fileItemReaderJob() {
return jobBuilderFactory.get("fileItemReaderJob2")
.start(step())
.build();
}
private Step step() {
return stepBuilderFactory.get("step")
//chunk size被设为了2,当ItemReader读的数据数量达到2的时候,
//这一批次的数据就一起被传到ItemReader
.<TestData, TestData>chunk(2)
.reader(fileItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader<TestData> fileItemReader() {
FlatFileItemReader<TestData> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("reader/file")); // 设置文件资源地址
reader.setLinesToSkip(1); // 忽略第一行
// AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
// 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
// 设置属性名,类似于表头
tokenizer.setNames("id", "field1", "field2", "field3");
// 将每行数据转换为TestData对象
DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>();
// 设置LineTokenizer
mapper.setLineTokenizer(tokenizer);
// 设置映射方式,即读取到的文本怎么转换为对应的POJO
mapper.setFieldSetMapper(fieldSet -> {
TestData data = new TestData();
data.setId(fieldSet.readInt("id"));
data.setField1(fieldSet.readString("field1"));
data.setField2(fieldSet.readString("field2"));
data.setField3(fieldSet.readString("field3"));
return data;
});
reader.setLineMapper(mapper);
return reader;
}
}
1.5.2 输出数据
@Component
public class FileItemWriterDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Resource(name = "writerSimpleReader")
private ListItemReader<TestData> writerSimpleReader;
@Bean
public Job fileItemWriterJob() throws Exception {
return jobBuilderFactory.get("fileItemWriterJob")
.start(step())
.build();
}
private Step step() throws Exception {
return stepBuilderFactory.get("step")
.<TestData, TestData>chunk(2)
.reader(writerSimpleReader)
.writer(fileItemWriter())
.build();
}
private FlatFileItemWriter<TestData> fileItemWriter() throws Exception {
FlatFileItemWriter<TestData> writer = new FlatFileItemWriter<>();
FileSystemResource file = new FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file");
Path path = Paths.get(file.getPath());
if (!Files.exists(path)) {
Files.createFile(path);
}
// 设置输出文件路径
writer.setResource(file);
// 把读到的每个TestData对象转换为JSON字符串
LineAggregator<TestData> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return "";
};
writer.setLineAggregator(aggregator);
writer.afterPropertiesSet();
return writer;
}
}
1.5.3 处理数据
@Component
public class ValidatingItemProcessorDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Resource(name = "processorSimpleReader")
private ListItemReader<TestData> processorSimpleReader;
@Bean
public Job validatingItemProcessorJob() throws Exception {
return jobBuilderFactory.get("validatingItemProcessorJob3")
.start(step())
.build();
}
private Step step() throws Exception {
return stepBuilderFactory.get("step")
.<TestData, TestData>chunk(2)
.reader(processorSimpleReader)
.processor(beanValidatingItemProcessor())
.writer(list -> list.forEach(System.out::println))
.build();
}
// private ValidatingItemProcessor<TestData> validatingItemProcessor() {
// ValidatingItemProcessor<TestData> processor = new ValidatingItemProcessor<>();
// processor.setValidator(value -> {
// // 对每一条数据进行校验
// if ("".equals(value.getField3())) {
// // 如果field3的值为空串,则抛异常
// throw new ValidationException("field3的值不合法");
// }
// });
// return processor;
// }
private BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
// 开启过滤,不符合规则的数据被过滤掉;
// beanValidatingItemProcessor.setFilter(true);
beanValidatingItemProcessor.afterPropertiesSet();
return beanValidatingItemProcessor;
}
}
1.6 任务调度
可以配合 quartz 或者 xxljob 实现定时任务执行,由于 Spring Boot
默认支持自动启动已配置好的 Job
,我们可以通过配置项spring.batch.job.enabled=false
来禁止Spring
容器自动启动Job
Spring Launch API
它的核心就是 JobLauncher
接口。JobLauncher
需要2个参数:Job
, JobParameters
@RestController
@RequestMapping("job")
public class JobController {
@Autowired
private Job job;
@Autowired
private JobLauncher jobLauncher;
@GetMapping("launcher/{message}")
public String launcher(@PathVariable String message) throws Exception {
JobParameters parameters = new JobParametersBuilder()
.addString("message", message)
.toJobParameters();
// 将参数传递给任务
jobLauncher.run(job, parameters);
return "success";
}
}