当前位置: 首页 > article >正文

golang rocketmq保证数据一致性(以电商订单为例)

数据一致性的原因

1.所有情况
a):本地订单信息创建失败,库存扣减成功(一致)
b):本地订单信息新建失败,但是库存扣减成功(不一致)
c):本地订单信息新建成功,但是库存扣减失败(不一致)
d):本地订单信息新建失败,库存扣减失败(一致)
本地数据库直接通过数据库事务就能完成,但是分布式系统跨微服务就很复杂
2.可能会出现的问题
在这里插入图片描述
a):库存扣减成功,本地执行失败-调用库存的归还接口(磁盘满了导致服务宕机、网络出问题调用失败-重试到成功-网络问题比较复杂:抖动、拥塞导致过了一段时间之后才会被接收到?-会不会导致重复归还-幂等性问题)
b):非本地执行失败,而是本地代码异常(挂掉),不知道本地执行情况
在这里插入图片描述
a):扣减库存发送出去,当网络阻塞,重试三次后请求回滚
b):扣减库存发送出去,在库存服务返回之前本地事务宕机

TCC解决库存扣减的问题

confirm或者cancel的时候都可能会宕机,但是下一次启动的时候可以读取日志继续没有完成的逻辑,实现比较复杂,java有seta库,golang有go-seta但不支持grpc
在这里插入图片描述

基于可靠消息最终一致性方案

基于可靠消息本质上解决了一个问题:可靠消息(消费者应该确保这个消息能被正确消费,即库存服务代码要可靠而且一定要执行,代码有bug或者宕机都能解决)
积分服务没有问题,但是库存服务可能存在库存不足,得允许失败(TCC比较合理一些,而且TCC并发没有可靠消息并发高)
在这里插入图片描述
改进执行库存后执行本地事务,关注库存扣减成功后出现的那种异常,扣减失败直接不执行
在这里插入图片描述
如何通过基于可靠消息实现库存的一致性?
1.订单信息准备一个归还库存的half消息
2.订单服务通过grpc调用库存服务
3.如果失败直接rollback不发送,什么事情都没有发生;如果成功执行本地事务
4.本地事务执行成功,通过rollback不发送1准备的消息;本地事务如果失败,通过commit发送1准备的消息通过调用reback逻辑归还库存
5.通过commit发送1准备的消息通过调用reback逻辑归还库存
到此正常的逻辑已经结束,第六七步是宕机或者程序出错的回查
6.如果异常程序挂掉,就回查本地逻辑
7.如果没有查询到本地信息就通过commit发送1准备的消息通过调用reback逻辑归还库存
在这里插入图片描述
潜在问题:下单但不支付-设置超时归还订单
改进:通过库存服务监听延时消息确保超时归还
在这里插入图片描述
发送端逻辑
构造OrderListener

type OrderListener struct{}

// When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
func (o *OrderListener) ExecuteLocalTransaction(addr *primitive.Message) primitive.LocalTransactionState {
	fmt.Println("开始执行本地逻辑")
	time.Sleep(time.Second * 3)
	fmt.Println("执行本地逻辑失败")
	//本地执行逻辑无缘无故失败 代码异常 宕机
	return primitive.UnknowState
}

// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
// method will be invoked to get local transaction status.
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
	fmt.Println("rocketmq的消息回查")
	time.Sleep(time.Second * 15)
	return primitive.CommitMessageState
}

创建订单API

func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {
	orderListener := OrderListener{}
	p, err := rocketmq.NewTransactionProducer(
		&orderListener,
		producer.WithNameServer([]string{"IP:PORT"}),
	)
	if err != nil {
		// panic("生成producer失败")
		zap.S().Errorf("生成producer失败:%s", err.Error())
		return nil, err
	}
	if err = p.Start(); err != nil {
		// panic("启动producer失败")
		zap.S().Errorf("启动producer失败:%s", err.Error())
		return nil, err
	}

	order := model.OrderInfo{
		OrderSn:      GenerateOrderSn(req.UserId),
		Address:      req.Address,
		SignerName:   req.Name,
		SingerMobile: req.Mobile,
		Post:         req.Post,
		User:         req.UserId,
	}
	//应该在消息中具体指名一个具体的商品的扣减情况
	jsonString, _ := json.Marshal(order)

	_, err = p.SendMessageInTransaction(context.Background(),
		primitive.NewMessage("order_reback", jsonString))
	if err != nil {
		fmt.Printf("发送失败:%s\n", err)
		return nil, status.Error(codes.Internal, "发送消息失败")
	}
	// 4.本地事务失败,commit
	if orderListener.Code != codes.OK {
		return nil, status.Error(orderListener.Code, orderListener.Detail)
	}

	return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
}

修改OrderListener通过指针的形式回传值

type OrderListener struct {
	Code        codes.Code
	Detail      string
	ID          int32
	OrderAmount float32
}

ExecuteLocalTransaction执行本地事务逻辑

// When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {

	var orderInfo model.OrderInfo
	_ = json.Unmarshal(msg.Body, &orderInfo)

	var goodsIds []int32
	var shopCarts []model.ShoppingCart
	goodsNumsMap := make(map[int32]int32)
	if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {
		o.Code = codes.InvalidArgument
		o.Detail = "没有选中结算的商品"
		//没扣减回滚,没必要发送
		return primitive.RollbackMessageState
	}
	for _, shopCart := range shopCarts {
		goodsIds = append(goodsIds, shopCart.Goods)
		goodsNumsMap[shopCart.Goods] = shopCart.Nums
	}
	//跨服务调用
	//商品微服务
	goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{
		Id: goodsIds,
	})
	if err != nil {
		o.Code = codes.Internal
		o.Detail = "批量查询商品信息失败"
		//没扣减回滚,没必要发送
		return primitive.RollbackMessageState
	}
	var orderAmount float32
	var orderGoods []*model.OrderGoods
	var goodsInvInfo []*proto.GoodsInvInfo
	for _, good := range goods.Data {
		orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])
		orderGoods = append(orderGoods, &model.OrderGoods{
			Goods:      good.Id,
			GoodsName:  good.Name,
			GoodsImage: good.GoodsFrontImage,
			GoodsPrice: good.ShopPrice,
			Nums:       goodsNumsMap[good.Id],
		})
		goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{
			GoodsId: good.Id,
			Num:     goodsNumsMap[good.Id],
		})
	}

	//跨服务调用库存微服务进行库存扣减
	if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {
		o.Code = codes.ResourceExhausted
		o.Detail = "扣减库存失败"
		return primitive.RollbackMessageState
	}
	//生成订单表
	//20241020xxxx
	tx := global.DB.Begin()
	orderInfo.OrderMount = orderAmount
	if result := tx.Save(&orderInfo); result.RowsAffected == 0 {
		tx.Rollback()
		o.Code = codes.ResourceExhausted
		o.Detail = "创建订单失败"
		return primitive.CommitMessageState
	}

	for _, orderGood := range orderGoods {
		orderGood.Order = orderInfo.ID
	}

	o.OrderAmount = orderAmount
	o.ID = orderInfo.ID
	//批量插入orderGoods
	if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {
		tx.Rollback()
		o.Code = codes.Internal
		o.Detail = "批量插入商品失败"
		return primitive.CommitMessageState
	}

	if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {
		tx.Rollback()
		o.Code = codes.Internal
		o.Detail = "删除购物车记录失败"
		return primitive.CommitMessageState
	}
	//发送延时消息
	p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"IP:PORT"}))
	if err != nil {
		panic("生成producer失败")
	}
	if err = p.Start(); err != nil {
		panic("启动producer失败")
	}
	msgs := primitive.NewMessage("order_timeout", msg.Body)
	msgs.WithDelayTimeLevel(16) //延时30mins
	_, err = p.SendSync(context.Background(), msgs)
	if err != nil {
		zap.S().Errorf("发送延时消息失败:%v\n", err.Error())
		tx.Rollback()
		o.Code = codes.Internal
		o.Detail = "发送延时消息失败"
		return primitive.CommitMessageState
	}
	if err = p.Shutdown(); err != nil {
		panic("关闭producer失败")
	}
	//提交事务
	tx.Commit()
	o.Code = codes.OK
	//本地执行逻辑无缘无故失败 代码异常 宕机
	return primitive.RollbackMessageState
}

消息回查CheckLocalTransaction

// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
// method will be invoked to get local transaction status.
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
	var orderInfo model.OrderInfo
	_ = json.Unmarshal(msg.Body, &orderInfo)
	//怎么检查之前的逻辑是否完成
	if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
		//这里并不能说明内存扣减了
		return primitive.CommitMessageState
	}

	return primitive.RollbackMessageState
}

消费端逻辑
归还表

type DeliveryHistory struct {
	Goods   int32  `gorm:"type:int;index"`
	Num     int32  `gorm:"type:int"`
	OrderSn string `gorm:"type:varchar(200)"`
	Status  string `gorm:"type:varchar(200)"` //1.代表已扣减.2.代表已归还,3.失败
}

表设计比较麻烦,十件商品建立十条信息,1.数据量大2.查询麻烦
改进表为StockSellDetail

type StockSellDetail struct {
	OrderSn string          `gorm:"type:varchar(200);index:idx_order_sn,unique"`
	Status  int             `gorm:"type:varchar(200)"` //1.表示已扣减,2.表示已归还
	Detail  GoodsDetailList `gorm:"type:varchar(200)"` //list类型,包括id和num
}

GoodsDetailList定义

type GoodsDetail struct {
	Goods int32
	Num   int32
}
type GoodsDetailList []GoodsDetail

func (g *GoodsDetailList) Scan(value interface{}) error {
	return json.Unmarshal(value.([]byte), &g)
}
func (g GoodsDetailList) Value() (driver.Value, error) {
	return json.Marshal(g)
}

通过gorm新建数据表
归还函数

func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	type OrderInfo struct {
		OrderSn string
	}
	for i := range msgs {
		var orderInfo OrderInfo
		err := json.Unmarshal(msgs[i].Body, &orderInfo)
		if err != nil {
			zap.S().Errorf("解析json失败:%v\n", msgs[i].Body)
			//代码问题 下一次重新发consumer.ConsumeRetryLater
			//数据问题 忽略
			return consumer.ConsumeSuccess, nil
		}
		//去将inv的库存加回去,将selldetail的status设置为2,要在事务中进行
		tx := global.DB.Begin()
		var sellDetail model.StockSellDetail
		if result := tx.Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {
			return consumer.ConsumeSuccess, nil
		}
		//逐个归还
		for _, orderGood := range sellDetail.Detail {
			//先查后改并发容易出问题,直接stocks=stocks+2
			if result := tx.Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("sotcks+?", orderGood.Num)); result.RowsAffected == 0 {
				tx.Rollback()
				//检查代码
				return consumer.ConsumeRetryLater, nil
			}
		}
		sellDetail.Status = 2
		if result := tx.Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn}).Update("status", 2); result.RowsAffected == 0 {
			tx.Rollback()
			return consumer.ConsumeRetryLater, nil
		}
		tx.Commit()
	}
	return consumer.ConsumeSuccess, nil
}

库存服务订阅topic

//监听库存归还topic
	c, err := rocketmq.NewPushConsumer(
		consumer.WithNameServer([]string{"IP:PORT"}),
		consumer.WithGroupName("shop"),
	)
	if err != nil {
		panic("连接失败")
	}
	if err = c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {
		fmt.Println("读取消息失败")
	}
	_ = c.Start()
	//不能让主goroutine退出
	time.Sleep(time.Hour)
	_ = c.Shutdown()

超时归还订阅topic

//订单超时归还topic
	c, err := rocketmq.NewPushConsumer(
		consumer.WithNameServer([]string{"IP:PORT"}),
		consumer.WithGroupName("order"),
	)
	if err != nil {
		panic("连接失败")
	}
	if err = c.Subscribe("order_timeout", consumer.MessageSelector{}, handler.OrderTimeout); err != nil {
		fmt.Println("读取消息失败")
	}
	_ = c.Start()
	//不能让主goroutine退出
	time.Sleep(time.Hour)
	_ = c.Shutdown()

超时归还处理函数

func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	for i := range msgs {
		var orderInfo model.OrderInfo
		_ = json.Unmarshal(msgs[i].Body, &orderInfo)
		fmt.Printf("获取到消息订单超时消息:%v\n", time.Now())
		//查询订单的支付状态,未支付归还
		var order model.OrderInfo
		if result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {
			return consumer.ConsumeSuccess, nil
		}
		if order.Status != "TRADE_SUCCESS" {
			//归还库存发送一个消息到order_reback
			tx := global.DB.Begin()
			//修改订单的状态已支付
			order.Status = "TRADE_CLOSE"
			tx.Save(&order)
			p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"IP:PORT"}))
			if err != nil {
				panic("生成producer失败")
			}
			if err = p.Start(); err != nil {
				panic("启动producer失败")
			}
			res, err := p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))
			if err != nil {
				tx.Rollback()
				fmt.Println("发送失败:%s\n", err)
				return consumer.ConsumeRetryLater, nil
			} else {
				fmt.Println("发送成功:%s\n", res.String())
			}
			tx.Commit()
			if err = p.Shutdown(); err != nil {
				panic("关闭producer失败")
			}

		}
	}
	return consumer.ConsumeSuccess, nil
}

shutdown导致消费偏移量offset更新失败

client.ClientID()默认返回进程的pid,而clientMap通过调用client.ClientID()为key创建或加载,等待shutdown时会移除,如果一个进程中多次调用且存在shutdown可能会导致消费偏移量offset更新失败,进而导致rocketmq出错。
解决方法
(1)生成clientID不一致
(2)不随便shutdown
注意
不要在一个进程中使用多个producer或者不要随意调用shutdown影响其他producer


http://www.kler.cn/a/447622.html

相关文章:

  • 数据结构经典算法总复习(下卷)
  • 靜態IP與DHCP的區別和用法
  • 跨站脚本攻击的多种方式——以XSS-Labs为例二十关详解解题思路
  • 全志H618 Android12修改doucmentsui鼠标单击图片、文件夹选中区域
  • 物联网:全面概述、架构、应用、仿真工具、挑战和未来方向
  • SLURM资料
  • JAVA前端开发中type=“danger“和 type=“text“的区别
  • 《计算机组成及汇编语言原理》阅读笔记:p28-p47
  • 修改npm镜像源
  • MyBatis是什么?为什么有全自动ORM框架还是MyBatis比较受欢迎?
  • Sora技术报告【官方版】
  • 【算法】——双指针(上)
  • Redis 多实例配置说明
  • 鸿蒙开发——关系型数据库的基本使用与跨设备同步
  • Vue简介和项目构建
  • Java详细总结
  • 12月第十七讲:Redis应用相关的缓存框架
  • 解锁 Jenkins 搭建全攻略
  • RabbitMQ如何实现延时队列?
  • Windows通过git-bash安装zsh
  • 基于 iAP2 协议 的指令协议,用于对安防设备的 MCU 进行操作
  • 【Java基础面试题029】Java中的hashCode和equals方法之间有什么关系?
  • Python tkinter写的《电脑装配单》和 Html版 可打印 可导出 excel 文件
  • CV算法在工作中有哪些实际应用?
  • 数据挖掘之认识数据
  • C++9--前置++和后置++重载,const,日期类的实现(对前几篇知识点的应用)