电商项目-分布式事务(四)基于消息队列实现分布式事务
基于消息队列实现分布式事务,实现消息最终一致性
如何基于消息队列实现分布式事务?
通过消息队列实现分布式事务的话,可以保证当前数据的最终一致性。实现思路:将大的分布式事务,进行拆分,拆分成若干个小的本地事务,只要每一个小的本地事务是执行成功的,那就代表当前的分布式事务是执行成功的。下面以用户下单增加积分,来进行消息队列实现分布式事务的操作。
功能实现流程图如下:
主要包含三个角色:订单服务,消息队列和用户服务。
步骤(1-4):订单服务
1.首先,在订单服务中,会生成订单相关数据,并添加到数据库中。
2.接着会在订单数据库中,创建任务表 ,会向任务表添加数据,(Username,orderID,point积分)
3.设置定时任务,扫描任务表,获取相关数据,如每隔7S扫描一次
4.发送任务数据到MQ
步骤(5-12):用户服务
5.6.7.用户服务会接收消息,保证消息的不可重复消费,判断在Redis和 在数据库中是否存在。防止重复处理,不要重复添加积分。
用户服务正在执行消息
如果不存在,继续业务功能。
添加本地事务
12.通知订单服务
步骤(13):订单服务
13.删除任务表数据。
一、 准备工作
1.1 shangcheng_order库新增数据表
任务表:
DROP TABLE IF EXISTS `tb_task`;
CREATE TABLE `tb_task` (
`id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '任务id',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`delete_time` datetime DEFAULT NULL,
`task_type` varchar(32) DEFAULT NULL COMMENT '任务类型',
`mq_exchange` varchar(64) DEFAULT NULL COMMENT '交换机名称',
`mq_routingkey` varchar(64) DEFAULT NULL COMMENT 'routingkey',
`request_body` varchar(512) DEFAULT NULL COMMENT '任务请求的内容',
`status` varchar(32) DEFAULT NULL COMMENT '任务状态',
`errormsg` varchar(512) DEFAULT NULL COMMENT '任务错误信息',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
历史任务表:
DROP TABLE IF EXISTS `tb_task_his`;
CREATE TABLE `tb_task_his` (
`id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '任务id',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`delete_time` datetime DEFAULT NULL,
`task_type` varchar(32) DEFAULT NULL COMMENT '任务类型',
`mq_exchange` varchar(64) DEFAULT NULL COMMENT '交换机名称',
`mq_routingkey` varchar(64) DEFAULT NULL COMMENT 'routingkey',
`request_body` varchar(512) DEFAULT NULL COMMENT '任务请求的内容',
`status` varchar(32) DEFAULT NULL COMMENT '任务状态',
`errormsg` varchar(512) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
1.2 shangcheng_service_order_api添加相关实体类
@Table(name="tb_task")
public class Task {
@Id
private Long id;
@Column(name = "create_time")
private Date createTime;
@Column(name = "update_time")
private Date updateTime;
@Column(name = "delete_time")
private Date deleteTime;
@Column(name = "task_type")
private String taskType;
@Column(name = "mq_exchange")
private String mqExchange;
@Column(name = "mq_routingkey")
private String mqRoutingkey;
@Column(name = "request_body")
private String requestBody;
@Column(name = "status")
private String status;
@Column(name = "errormsg")
private String errormsg;
//getter,setter略
}
@Table(name="tb_task_his")
public class TaskHis {
@Id
private Long id;
@Column(name = "create_time")
private Date createTime;
@Column(name = "update_time")
private Date updateTime;
@Column(name = "delete_time")
private Date deleteTime;
@Column(name = "task_type")
private String taskType;
@Column(name = "mq_exchange")
private String mqExchange;
@Column(name = "mq_routingkey")
private String mqRoutingkey;
@Column(name = "request_body")
private String requestBody;
@Column(name = "status")
private String status;
@Column(name = "errormsg")
private String errormsg;
//getter,setter略
}
1.3 shangcheng_user新增积分日志表
DROP TABLE IF EXISTS `tb_point_log`;
CREATE TABLE `tb_point_log` (
`order_id` varchar(200) NOT NULL,
`user_id` varchar(200) NOT NULL,
`point` int(11) NOT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1.4 shangcheng_service_user_api添加实体类 PointLog
@Table(name="tb_point_log")
public class PointLog {
private String orderId;
private String userId;
private Integer point;
//getter,setter略
}
1.5 shangcheng_service_order添加rabbitMQ配置类
@Configuration
public class RabbitMQConfig {
//添加积分任务交换机
public static final String EX_BUYING_ADDPOINTUSER = "ex_buying_addpointuser";
//添加积分消息队列
public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint";
//完成添加积分消息队列
public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint";
//添加积分路由key
public static final String CG_BUYING_ADDPOINT_KEY = "addpoint";
//完成添加积分路由key
public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint";
/**
* 交换机配置
* @return the exchange
*/
@Bean(EX_BUYING_ADDPOINTUSER)
public Exchange EX_BUYING_ADDPOINTUSER() {
return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();
}
//声明队列
@Bean(CG_BUYING_FINISHADDPOINT)
public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
Queue queue = new Queue(CG_BUYING_FINISHADDPOINT);
return queue;
}
//声明队列
@Bean(CG_BUYING_ADDPOINT)
public Queue QUEUE_CG_BUYING_ADDPOINT() {
Queue queue = new Queue(CG_BUYING_ADDPOINT);
return queue;
}
/**
* 绑定队列到交换机 .
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();
}
@Bean
public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();
}
}
二、 订单服务添加任务并发送
步骤1: 修改添加订单方法
当添加订单的时候,添加任务表中相关数据, 代码如下:
//增加任务表记录
Task task = new Task();
task.setCreateTime(new Date());
task.setUpdateTime(new Date());
task.setMqExchange(RabbitMQConfig.EX_BUYING_ADDPOINTURSE);
task.setMqRoutingkey(RabbitMQConfig.CG_BUYING_ADDPOINT_KEY);
Map map = new HashMap();
map.put("userName",order.getUsername());
map.put("orderId",order.getId());
map.put("point",order.getPayMoney());
task.setRequestBody(JSON.toJSONString(map));
taskMapper.insertSelective(task);
步骤2: 定时扫描任务表最新数据
订单服务新增定时任务类,获取小于系统当前时间的所有任务数据
(1) 修改订单服务启动类,添加开启定时任务注解
@EnableScheduling
(2) 定义定时任务类
查询最新数据
更新taskMapper新增方法,查询所有小于系统当前时间的数据
public interface TaskMapper extends Mapper<Task> {
@Select("SELECT * from tb_task WHERE update_time<#{currentTime}")
@Results({@Result(column = "create_time",property = "createTime"),
@Result(column = "update_time",property = "updateTime"),
@Result(column = "delete_time",property = "deleteTime"),
@Result(column = "task_type",property = "taskType"),
@Result(column = "mq_exchange",property = "mqExchange"),
@Result(column = "mq_routingkey",property = "mqRoutingkey"),
@Result(column = "request_body",property = "requestBody"),
@Result(column = "status",property = "status"),
@Result(column = "errormsg",property = "errormsg")})
List<Task> findTaskLessTanCurrentTime(Date currentTime);
}
任务类实现
@Component
public class QueryPointTask {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TaskMapper taskMapper;
@Scheduled(cron = "0 0/2 * * * ?")
public void queryTask(){
//1.获取小于系统当前时间数据
List<Task> taskList = taskMapper.findTaskLessTanCurrentTime(new Date());
if (taskList!=null && taskList.size()>0){
//将任务数据发送到消息队列
for (Task task : taskList) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_ADDPOINT_KEY, JSON.toJSONString(task));
}
}
}
}
三、 用户服务更改积分
步骤1: 添加rabbitmq配置类(与订单服务相同)
@Configuration
public class RabbitMQConfig {
//添加积分任务交换机
public static final String EX_BUYING_ADDPOINTURSE = "ex_buying_addpointurse";
//添加积分消息队列
public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint";
//完成添加积分消息队列
public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint";
//添加积分路由key
public static final String CG_BUYING_ADDPOINT_KEY = "addpoint";
//完成添加积分路由key
public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint";
/**
* 交换机配置
* @return the exchange
*/
@Bean(EX_BUYING_ADDPOINTURSE)
public Exchange EX_DECLARE() {
return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTURSE).durable(true).build();
}
//声明队列
@Bean(CG_BUYING_FINISHADDPOINT)
public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
Queue queue = new Queue(CG_BUYING_FINISHADDPOINT);
return queue;
}
//声明队列
@Bean(CG_BUYING_ADDPOINT)
public Queue QUEUE_CG_BUYING_ADDPOINT() {
Queue queue = new Queue(CG_BUYING_ADDPOINT);
return queue;
}
/**
* 绑定队列到交换机 .
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();
}
@Bean
public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();
}
}
步骤2: 定义消息监听类:
@Component
public class AddPointListener {
@Autowired
private UserService userService;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RabbitMQConfig.CG_BUYING_ADDPOINT)
public void receiveMessage(String message){
Task task = JSON.parseObject(message, Task.class);
if (task == null || StringUtils.isEmpty(task.getRequestBody())){
return;
}
//判断redis中是否存在内容
Object value = redisTemplate.boundValueOps(task.getId()).get();
if (value != null){
return;
}
//更新用户积分
int result = userService.updateUserPoints(task);
if (result<=0){
return;
}
//返回通知
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_FINISHADDPOINT_KEY,JSON.toJSONString(task));
}
}
步骤3: 定义修改用户积分实现
实现思路:
1)判断当前订单是否操作过
2)将任务存入redis
3)修改用户积分
4)添加积分日志表记录
5)删除redis中记录
@Autowired
private PointLogMapper pointLogMapper;
/**
* 修改用户积分
* @param task
* @return
*/
@Override
@Transactional
public int updateUserPoints(Task task) {
Map info = JSON.parseObject(task.getRequestBody(), Map.class);
String userName = info.get("userName").toString();
String orderId = info.get("orderId").toString();
int point = (int) info.get("point");
//判断当前订单是否操作过
PointLog pointLog = pointLogMapper.findLogInfoByOrderId(orderId);
if (pointLog != null){
return 0;
}
//将任务存入redis
redisTemplate.boundValueOps(task.getId()).set("exist",1,TimeUnit.MINUTES);
//修改用户积分
int result = userMapper.updateUserPoint(userName, point);
if (result<=0){
return result;
}
//添加积分日志表记录
pointLog = new PointLog();
pointLog.setOrderId(orderId);
pointLog.setPoint(point);
pointLog.setUserId(userName);
result = pointLogMapper.insertSelective(pointLog);
if (result<=0){
return result;
}
//删除redis中的记录
redisTemplate.delete(task.getId());
return 1;
}
步骤4: 定义根据订单id查询积分日志表
定义PointLogMapper,实现根据订单id查询:
public interface PointLogMapper extends Mapper<PointLog> {
@Select("select * from tb_point_log where order_id=#{orderId}")
PointLog findLogInfoByOrderId(@Param("orderId") String orderId);
}
四、 订单服务删除原任务
步骤1: 定义监听类
在订单服务中定义监听类,用于监听队列,如果队列中有消息,则删除原任务防止消息重复发送,并对任务信息进行记录
@Component
public class DelTaskListener {
@Autowired
private TaskService taskService;
@RabbitListener(queues = RabbitMQConfig.CG_BUYING_FINISHADDPOINT)
public void receiveMessage(String message){
Task task = JSON.parseObject(message, Task.class);
taskService.delTask(task);
}
}
步骤: 定义任务service
public interface TaskService {
void delTask(Task task);
}
@Service
@Transactional
public class TaskServiceImpl implements TaskService {
@Autowired
private TaskMapper taskMapper;
@Autowired
private TaskHisMapper taskHisMapper;
@Override
public void delTask(Task task) {
//1. 设置删除时间
task.setDeleteTime(new Date());
Long id = task.getId();
task.setId(null);
//bean复制
TaskHis taskHis = new TaskHis();
BeanUtils.copyProperties(task,taskHis);
//记录任务信息
taskHisMapper.insertSelective(taskHis);
//删除原任务
task.setId(id);
taskMapper.deleteByPrimaryKey(task);
}
}