从零搭建微服务项目Pro(第1-2章——Quartz实现定时任务模块优化)
前言:
在企业项目中,往往有定时任务发布的需求,比如每天晚9点将今日数据备份一次,或每月一号将上月的销售数据邮件发送给对应的工作人员。显然这些操作不可能是人工到时间点调用一次接口,需要编写专门的模块完成任务的调度。
在前一章中已经实现一种基于Quartz的定时任务模块,只需要将定时任务所需要的参数定义在数据库中,模块可从数据库中生成对应的定时任务并进行维护。但正如上章最后提到尽管基本功能已经实现,但仍然存在一些bug,且任务状态机的转化设计的不够明确,导致任务管理仍然存在过耦合的问题。
从零搭建微服务项目Pro(第1-1章——Quartz实现定时任务模块)-CSDN博客https://blog.csdn.net/wlf2030/article/details/145785349在本章为解决上章遗留问题,重新设计模块,做了八项调优(见加粗)。任务池管理内核重构,仅对外提供启动和暂停任务接口,简化scheduler调度操作。通过mvc设计模式完成任务池,任务数据层的分离,由manager类统一管理保证内存和数据库信息统一,并在manager类中规范任务自动机的转换。通过引入事件机制,实现任务的异步调用,防止队列阻塞,将原有执行类直接拿取任务实例转化为从事件中拿取任务唯一标识,进而从数据库中实例化任务,进一步保证内存和数据库信息统一,并一定程度防止接口攻击导致错误的任务执行。重新划分异常类型为执行错误和状态转化错误便于后期aop对响应体的设置,以及添加日志输出,将每次任务执行结果和异常信息序列化至数据库方便朔源。
具体类图如下:
本章几乎对上章所有文件均有修改,因此无法提供一步步修改的指导,建议下载本章源码结合类图和上章讲解理解模块。源码链接如下:
wlf728050719/SpringCloudPro1-2https://github.com/wlf728050719/SpringCloudPro1-2以及在后续的1-3章会将quartz模块整合至之前的微服务项目中,感兴趣的小伙伴可以提前了解下,方便后续章节模块移植,微服务项目链接如下:
从零搭建微服务项目Base(第0章——微服务项目结构搭建)_从0创建微服务项目-CSDN博客https://blog.csdn.net/wlf2030/article/details/145206361以及本专栏会持续更新微服务项目,每一章的项目都会基于前一章项目进行功能的完善,欢迎小伙伴们关注!同时如果只是对单章感兴趣也不用从头看,只需下载前一章项目即可,每一章都会有前置项目准备部分,跟着操作就能实现上一章的最终效果,当然如果是一直跟着做可以直接跳过这一部分。专栏目录链接如下,其中Base篇为基础微服务搭建,Pro篇为复杂模块实现。
从零搭建微服务项目(全)-CSDN博客https://blog.csdn.net/wlf2030/article/details/145799620
一、效果演示
quartz定时模块演示
最终数据库记录日志:(会记录失败message以及exception的cause)
最终数据库中任务状态(task1启动,执行成功;task2暂停,执行失败)
其中使用postman请求体如下(name和group为必需项,用于指定task,status字段无效代码控制)
AddTest1: Post http://localhost:6699/quartz/add (type为2即使用bean必传项)
{
"taskName": "task1",
"taskGroup": "group1",
"type": 2,
"beanName": "test",
"methodName": "test",
"params": "test1",
"cronExpression": "*/5 * * * * ?"
}
AddTest2: Post http://localhost:6699/quartz/add (type为1即使用class必传项)
{
"taskName": "task2",
"taskGroup": "group1",
"type": 1,
"className": "cn.bit.pro1_2.service.TestJavaService",
"methodName": "test",
"params": "test",
"cronExpression": "*/5 * * * * ?"
}
Start: Put http://localhost:6699/quartz/start
{
"taskName": "task1",
"taskGroup": "group1"
}
Pause: Put http://localhost:6699/quartz/pause
{
"taskName": "task1",
"taskGroup": "group1"
}
Delete: Delete http://localhost:6699/quartz/delete
{
"taskName": "task1",
"taskGroup": "group1"
}
二、数据库sql
可自行添加外键限制,以及选择task_name和task_group一对字段作为主键
create table tb_task
(
id int auto_increment
primary key,
task_name varchar(255) not null,
task_group varchar(255) not null,
type int not null,
bean_name varchar(255) null,
class_name varchar(255) null,
path varchar(255) null,
method_name varchar(255) null,
params varchar(255) null,
cron_expression varchar(255) not null,
description text null,
status int default 0 not null,
result int null
);
create table tb_task_log
(
id int auto_increment
primary key,
task_id int not null,
start_time datetime not null,
execute_time varchar(255) not null,
result tinyint not null,
message varchar(255) not null,
exception_info text null
);
三、核心代码
1.工厂模式
统一不同bean或不同类的不同方法名、不同形参函数为ITaskHandler的invoke(Task)方法
package cn.bit.pro1_2.core.handler;
import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.exception.TaskInvokeException;
public interface ITaskHandler {
void invoke(Task task) throws TaskInvokeException;
}
类反射实现
package cn.bit.pro1_2.core.handler.impl;
import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.Result;
import cn.bit.pro1_2.core.exception.TaskInvokeException;
import cn.bit.pro1_2.core.handler.ITaskHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@Slf4j
@Component
public class JavaClassTaskHandler implements ITaskHandler {
@Override
public void invoke(Task task) throws TaskInvokeException {
try {
Object target;
Class<?> clazz;
Method method;
Result returnValue;
clazz = Class.forName(task.getClassName());
target = clazz.newInstance();
if (task.getParams() == null || task.getParams().isEmpty()) {
method = target.getClass().getDeclaredMethod(task.getMethodName());
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target);
} else {
method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target, task.getParams());
}
//判断业务是否执行成功
if (returnValue == null || Result.FAIL.equals(returnValue))
throw new TaskInvokeException("JavaClassTaskHandler方法执行失败",null);
} catch (NoSuchMethodException e) {
throw new TaskInvokeException("JavaClassTaskHandler找不到对应方法", e);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new TaskInvokeException("JavaClassTaskHandler执行反射方法异常", e);
} catch (ClassCastException e) {
throw new TaskInvokeException("JavaClassTaskHandler方法返回值定义错误", e);
} catch (ClassNotFoundException e) {
throw new TaskInvokeException("JavaClassTaskHandler找不到对应类", e);
} catch (InstantiationException e) {
throw new TaskInvokeException("JavaClassTaskHandler实例化错误", e);
}
}
}
bean反射实现
package cn.bit.pro1_2.core.handler.impl;
import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.Result;
import cn.bit.pro1_2.core.exception.TaskInvokeException;
import cn.bit.pro1_2.core.handler.ITaskHandler;
import cn.bit.pro1_2.core.util.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@Slf4j
@Component
public class SpringBeanTaskHandler implements ITaskHandler {
@Override
public void invoke(Task task) throws TaskInvokeException {
try {
Object target;
Method method;
Result returnValue;
//上下文寻找对应bean
target = SpringContextHolder.getApplicationContext().getBean(task.getBeanName());
//寻找对应方法
if(task.getParams()==null|| task.getParams().isEmpty())
{
method = target.getClass().getDeclaredMethod(task.getMethodName());
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target);
}
else{
method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);
ReflectionUtils.makeAccessible(method);
returnValue = (Result) method.invoke(target, task.getParams());
}
//判断业务是否执行成功
if(returnValue==null || Result.FAIL.equals(returnValue))
throw new TaskInvokeException("SpringBeanTaskHandler方法执行失败", null);
}catch (NoSuchBeanDefinitionException e){
throw new TaskInvokeException("SpringBeanTaskHandler找不到对应bean",e);
} catch (NoSuchMethodException e) {
throw new TaskInvokeException("SpringBeanTaskHandler找不到对应方法",e);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new TaskInvokeException("SpringBeanTaskHandler执行反射方法异常",e);
} catch (ClassCastException e) {
throw new TaskInvokeException("SpringBeanTaskHandler方法返回值定义错误",e);
}
}
}
工厂类,根据task类型返回不同的实现类
package cn.bit.pro1_2.core.handler;
import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.TaskType;
import cn.bit.pro1_2.core.handler.impl.JavaClassTaskHandler;
import cn.bit.pro1_2.core.handler.impl.SpringBeanTaskHandler;
import cn.bit.pro1_2.core.util.SpringContextHolder;
import org.springframework.stereotype.Component;
@Component
public class TaskHandlerFactory {
public static ITaskHandler getTaskHandler(Task task) {
ITaskHandler taskHandler = null;
if(TaskType.SPRING_BEAN.getCode().equals(task.getType())) {
taskHandler = SpringContextHolder.getApplicationContext().getBean(SpringBeanTaskHandler.class);
}
if(TaskType.JAVA_CLASS.getCode().equals(task.getType())) {
taskHandler = SpringContextHolder.getApplicationContext().getBean(JavaClassTaskHandler.class);
}
return taskHandler;
}
}
2.事件机制
事件,传递任务唯一标识
package cn.bit.pro1_2.core.events.event;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
@ToString
@Getter
@AllArgsConstructor
public class TaskInvokeEvent {
private final String taskName;
private final String taskGroup;
}
发布者,同时实现job接口作为schduler调度的任务类:
package cn.bit.pro1_2.core.events.publisher;
import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.events.event.TaskInvokeEvent;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@AllArgsConstructor
@Slf4j
@Component
public class TaskInvokePublisher implements Job {
private final ApplicationEventPublisher publisher;
@Override
public void execute(JobExecutionContext jobExecutionContext){
Task task = (Task) jobExecutionContext.getJobDetail().getJobDataMap().get("task");
//发布事件异步执行任务
TaskInvokeEvent event =new TaskInvokeEvent(task.getTaskName(),task.getTaskGroup());
publisher.publishEvent(event);
log.info("任务执行事件发布:{}",event);
}
}
监听者,监听到事件后执行对应方法并记录日志和异常
package cn.bit.pro1_2.core.events.listener;
import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.entity.TaskLog;
import cn.bit.pro1_2.core.enums.Result;
import cn.bit.pro1_2.core.events.event.TaskInvokeEvent;
import cn.bit.pro1_2.core.exception.TaskInvokeException;
import cn.bit.pro1_2.core.handler.ITaskHandler;
import cn.bit.pro1_2.core.handler.TaskHandlerFactory;
import cn.bit.pro1_2.core.service.TaskLogService;
import cn.bit.pro1_2.core.service.TaskService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@AllArgsConstructor
@Component
public class TaskInvokeListener {
private final TaskLogService taskLogService;
private final TaskService taskService;
@Async
@Order
@EventListener(TaskInvokeEvent.class)
public void notifyTaskInvoke(TaskInvokeEvent event) {
//从数据库中拿取任务
Task task = taskService.selectTaskByNameAndGroup(event.getTaskName(), event.getTaskGroup());
log.info("任务执行事件监听,准备执行任务{}",task);
ITaskHandler handler = TaskHandlerFactory.getTaskHandler(task);
long startTime = System.currentTimeMillis();
TaskLog taskLog = new TaskLog();
taskLog.setTaskId(task.getId());
taskLog.setStartTime(new Date());
boolean success = true;
try {
handler.invoke(task);
} catch (TaskInvokeException e) {
log.error("{},Task:{}", e.getMessage(),task);
success = false;
taskLog.setMessage(e.getMessage());
if(e.getException()!=null){
taskLog.setExceptionInfo(e.getException().getCause().toString());
e.getException().printStackTrace();
}
}
if(success)
{
taskLog.setMessage("执行成功");
taskLog.setResult(Result.SUCCESS.getCode());
task.setResult(Result.SUCCESS.getCode());
taskService.setTaskResult(task);
}
else
{
taskLog.setResult(Result.FAIL.getCode());
task.setResult(Result.FAIL.getCode());
taskService.setTaskResult(task);
}
long endTime = System.currentTimeMillis();
taskLog.setExecuteTime(String.valueOf(endTime-startTime));
taskLogService.insert(taskLog);
}
}
3.MVC模式
task持久层接口,即model
package cn.bit.pro1_2.core.service;
import cn.bit.pro1_2.core.entity.Task;
import java.util.List;
public interface TaskService {
List<Task> selectAllTask();
int updateTaskInfo(Task task);
int updateTaskStatus(Task task);
int insertTask(Task task);
int deleteTask(Task task);
int setTaskResult(Task task);
Task selectTaskByNameAndGroup(String taskName, String groupName);
}
任务池,即view
package cn.bit.pro1_2.core.mvc;
import cn.bit.pro1_2.core.events.publisher.TaskInvokePublisher;
import cn.bit.pro1_2.core.entity.Task;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@AllArgsConstructor
public class TaskPool {
public static JobKey getJobKey(@NonNull Task task) {
return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());
}
public static TriggerKey getTriggerKey(@NonNull Task task) {
return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());
}
/**
* 任务池添加任务
* @param task
* @param scheduler
* @throws SchedulerException
*/
public void addTask(@NonNull Task task,Scheduler scheduler) throws SchedulerException {
JobKey jobKey = getJobKey(task);
TriggerKey triggerKey = getTriggerKey(task);
JobDetail jobDetail = JobBuilder.newJob(TaskInvokePublisher.class).withIdentity(jobKey).build();
jobDetail.getJobDataMap().put("task",task);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
.build();
scheduler.scheduleJob(jobDetail, trigger);
}
/**
* 任务池暂停并移除任务
* @param task
* @param scheduler
* @throws SchedulerException
*/
public void pauseTask(@NonNull Task task,Scheduler scheduler) throws SchedulerException {
scheduler.pauseJob(getJobKey(task));
scheduler.deleteJob(getJobKey(task));
}
}
manager,即controller
package cn.bit.pro1_2.core.mvc;
import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.TaskStatus;
import cn.bit.pro1_2.core.exception.TaskRepositoryException;
import cn.bit.pro1_2.core.service.TaskService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class TaskManger {
private final Scheduler scheduler;
private final TaskService taskService;
private final TaskPool taskPool;
/**
* 从数据库中反序列化任务数据,保证服务器重启后恢复任务池状态
* @throws SchedulerException
*/
@PostConstruct
public void init() throws SchedulerException {
log.info("TaskManager初始化开始...");
List<Task> tasks = taskService.selectAllTask();
if(tasks != null && !tasks.isEmpty()) {
for (Task task : tasks)
{
if(TaskStatus.RUNNING.getCode().equals(task.getStatus()))
taskPool.addTask(task,scheduler);
}
log.info("初始化加载{}项任务", tasks.size());
}
log.info("TaskManager初始化结束...");
}
/**
* 添加暂停且未被持久化的新任务
* @param task
* @throws SchedulerException
*/
public void addTask(Task task) throws SchedulerException {
Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());
if(temp != null)
throw new TaskRepositoryException("存在相同任务组和任务名任务",task);
if(!TaskStatus.PAUSE.getCode().equals(task.getStatus()))
throw new TaskRepositoryException("只能添加暂停的任务",task);
taskService.insertTask(task);
log.info("添加任务{}", task);
}
/**
* 在任务暂停时更新任务信息
* @param task
* @throws SchedulerException
*/
public void updateTask(Task task) throws SchedulerException {
Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());
if(temp == null)
throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);
if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus()))
throw new TaskRepositoryException("只能暂停时更新任务",task);
taskService.updateTaskInfo(task);
log.info("更新任务{}", task);
}
/**
* 启动暂停中任务
* @param task 只使用name和group字段
* @throws SchedulerException
*/
public void startTask(Task task) throws SchedulerException {
Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());
if(temp == null)
throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);
if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus()))
throw new TaskRepositoryException("只能启动暂停中任务",task);
taskPool.addTask(temp,scheduler);
//添加任务池未有异常时持久化数据
temp.setStatus(TaskStatus.RUNNING.getCode());
taskService.updateTaskStatus(temp);
log.info("启动任务{}", temp);
}
/**
* 暂停运行中任务
* @param task 只使用name和group字段
* @throws SchedulerException
*/
public void pauseTask(Task task) throws SchedulerException {
Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());
if(temp == null)
throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);
if(!TaskStatus.RUNNING.getCode().equals(temp.getStatus()))
throw new TaskRepositoryException("只能暂停运行中任务",task);
taskPool.pauseTask(temp,scheduler);
//添加任务池未有异常时持久化数据
temp.setStatus(TaskStatus.PAUSE.getCode());
taskService.updateTaskStatus(temp);
log.info("暂停任务{}", temp);
}
/**
* 暂停暂停中任务
* @param task 只使用name和group字段
* @throws SchedulerException
*/
public void deleteTask(Task task) throws SchedulerException {
Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());
if(temp == null)
throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);
if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus()))
throw new TaskRepositoryException("只能删除暂停中任务",task);
taskService.deleteTask(temp);
log.info("删除任务{}", temp);
}
}
最后:
回头发现无论是本科的软件体系结构课程还是研究生的体系结构居然都是最实用的课程,虽然两门课程最后大作业都很抽象,一个是分析区块链源码,一个是docker部署微服务项目,虽然本科上课一直摸鱼但还是有了解设计模式,这个模块就用到其中比如mvc,工厂模式等,研究生则让我接触了微服务,也算是帮我找了份实习。下一章会把这个模块整合到Base的微服务项目中,之后还会写一篇关于jsr303+springmvc的博客,还请多多支持!