golang rocketmq开发
安装
下载
https://mxshop-files.oss-cn-hangzhou.aliyuncs.com/install.zip
新建文件夹
mkdir rocketmq
解压
unzip install.zip -d rocketmq/
修改配置文件
cd rocketmq/install/conf/
vim broker.conf
修改brokerIP1为当前IP,如果是本地电脑填对应IP地址,如果是虚拟机填虚拟机的IP地址,如果是服务器填服务器的公网IP并开放rocketmq使用的全部端口(启动后通过docker ps -a查看,否则容易出错)
启动
cd ..
docker-compose up
第一次拉取镜像如果出网络问题参考如下链接(ubuntu系统,其他操作系统自行解决)
ubuntu Error response from daemon: Get “https://registry-1.docker.io/v2/”: net/http
访问rocketmq网页http://39.103.59.35:8080/#/message
go包官网
go get github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
rocketmq的基本概念
Producer:消息的发送者;举例:发信者
Consumer:消息接收者;举例:收信者
Broker:暂存和传输消息;举例:邮局
NameServer:管理Broker;举例:各个邮局的管理机构
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
Message Queue:相当于是Topic的分区;用于并行发送和接收消息
rocketmq的消息类型
1按照发送的特点分:
1.1 同步发送
a. 同步发送,线程阻塞,投递completes阻塞结束
b. 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
c. 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
d. SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值的
1.2 异步发送
a. 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
b. 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry,源码参考 DefaultMQProducerImpl.class
c. 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
1.3 单向发送
a. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
b. 此方式发送消息的过程耗时非常短,一般在微秒级别
2按照使用功能特点分:
2.1 普通消息(订阅)
普通消息是我们在业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景。
producer
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"39.103.59.35:9876"}))
if err != nil {
panic("生成producer失败")
}
if err = p.Start(); err != nil {
panic("启动producer失败")
}
res, err := p.SendSync(context.Background(), primitive.NewMessage("imooc1", []byte("this is imooc1")))
if err != nil {
fmt.Printf("发送失败:%s\n", err)
} else {
fmt.Printf("发送成功:%s\n", res.String())
}
if err = p.Shutdown(); err != nil {
fmt.Printf("关闭producer失败")
}
}
consumer
package main
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"39.103.59.35:9876"}),
consumer.WithGroupName("maxshop"),
)
if err != nil {
panic("连接失败")
}
if err = c.Subscribe("imooc1", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("获取到值:%v\n", msgs[i])
}
return consumer.ConsumeSuccess, nil
}); err != nil {
fmt.Println("读取消息失败")
}
_ = c.Start()
//不能让主goroutine退出
time.Sleep(time.Hour)
_ = c.Shutdown()
}
2.2 顺序消息
顺序消息分为分区顺序消息和全局顺序消息,全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的FIFO,很多时候全局消息的实现代价很大,所以就出现了分区顺序消息。分区顺序消息的概念可以如下图所示:
我们通过对消息的key,进行hash,相同hash的消息会被分配到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要一个即可,所以全局顺序消息的代价是比较大的。
3. 延时消息 - 订单超时库存归还
延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s
应用于超时归还(支付-淘宝、12306、购票):1.时间一到就执行。2.消息中包含了订单编号,只需要查询订单编号。如果采用轮询(超时时间30分钟),时间设为1分钟,则导致29次是无用的,时间设置为30分钟,会导致用户等待支付结果时间过长。
producer
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"39.103.59.35:9876"}))
if err != nil {
panic("生成producer失败")
}
if err = p.Start(); err != nil {
panic("启动producer失败")
}
msg := primitive.NewMessage("imooc1", []byte("this is delay message"))
msg.WithDelayTimeLevel(2)
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("发送失败:%s\n", err)
} else {
fmt.Printf("发送成功:%s\n", res.String())
}
if err = p.Shutdown(); err != nil {
fmt.Printf("关闭producer失败")
}
}
consumer
package main
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"39.103.59.35:9876"}),
consumer.WithGroupName("maxshop"),
)
if err != nil {
panic("连接失败")
}
if err = c.Subscribe("imooc1", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("获取到值:%v\n", msgs[i])
}
return consumer.ConsumeSuccess, nil
}); err != nil {
fmt.Println("读取消息失败")
}
_ = c.Start()
//不能让主goroutine退出
time.Sleep(time.Hour)
_ = c.Shutdown()
}
4. 事务消息
消息队列RocketMQ版提供的分布式事务消息适用于所有对数据最终一致性有强需求的场景。本文介绍消息队列RocketMQ版事务消息的概念、优势、典型场景、交互流程以及使用过程中的注意事项。
概念介绍
○ 事务消息:消息队列RocketMQ版提供类似X或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
○ 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
○ 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
分布式事务消息的优势
消息队列RocketMQ版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
package main
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type OrderListener struct{}
// When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
func (o *OrderListener) ExecuteLocalTransaction(addr *primitive.Message) primitive.LocalTransactionState {
// example1 本地成功不会调用回查
// fmt.Println("开始执行本地逻辑")
// time.Sleep(time.Second * 3)
// fmt.Println("执行本地逻辑成功")
// return primitive.CommitMessageState
// example2 本地失败不会调用回查
// fmt.Println("开始执行本地逻辑")
// time.Sleep(time.Second * 3)
// fmt.Println("执行本地逻辑失败")
// return primitive.RollbackMessageState
// example1 本地失败不会调用回查
fmt.Println("开始执行本地逻辑")
time.Sleep(time.Second * 3)
fmt.Println("执行本地逻辑失败")
//本地执行逻辑无缘无故失败 代码异常 宕机
return primitive.UnknowState
}
// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
// method will be invoked to get local transaction status.
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Println("rocketmq的消息回查")
time.Sleep(time.Second * 15)
return primitive.CommitMessageState
}
func main() {
p, err := rocketmq.NewTransactionProducer(
&OrderListener{},
producer.WithNameServer([]string{"39.103.59.35:9876"}),
)
if err != nil {
panic("生成producer失败")
}
if err = p.Start(); err != nil {
panic("启动producer失败")
}
res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("transTopic", []byte("this is transaction message")))
if err != nil {
fmt.Printf("发送失败:%s\n", err)
} else {
fmt.Printf("发送成功:%s\n", res.String())
}
//回查
time.Sleep(time.Hour)
if err = p.Shutdown(); err != nil {
panic("关闭producer失败")
}
}