go对rabbitmq基本操作
一、安装rabbitmq
-
1、直接使用
docker
拉取镜像docker pull rabbitmq:3.8
-
2、启动容器
docker run \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=123456 \ -v mq-plugins:/plugins \ --name rabbit01 \ --hostname rabbit01 --restart=always \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8
-
3、关于端口的介绍
- 15672的给浏览器控制台使用的
- 5672是给程序调用的
-
4、进入到
rabbit01
容器中docker exec -it rabbit01 /bin/bash
-
5、开启可视化界面操作
rabbitmq-plugins enable rabbitmq_management
-
6、客户端直接访问
xx:15672
-
7、或者直接用别人搞好的镜像
docker run \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=123456 \ -v mq-plugins:/plugins \ --name rabbit02 \ --hostname rabbit02 --restart=always \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management
二、go
语言对rabbitmq
基本操作
-
1、安装依赖包
go get github.com/streadway/amqp
-
2、基本的连接操作
package main import ( "fmt" "github.com/streadway/amqp" ) func main() { // 连接rabbitmq // conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称") // 端口号:5672 conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672 defer conn.Close() // 打开通道 ch, err := conn.Channel() fmt.Println(err) defer ch.Close() }
-
3、由于部分每个地方都要使用,封装成一个方法
package utils import ( "fmt" "github.com/streadway/amqp" ) func RabbitmqUtils() *amqp.Channel { // 连接rabbitmq conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672 //defer conn.Close() // 打开通道 ch, err := conn.Channel() fmt.Println(err) //defer ch.Close() return ch }
-
4、创建一个队列,然后到可视化界面查看是否自动创建
func main() { // 创建一个队列 // durable, autoDelete, exclusive, noWait bool queue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil) fmt.Println(queue.Name, err) }
-
5、关于创建队列几个参数的介绍
- 第一个参数是队列名称
- 第二个参数是队列是否持久化
- 第三个参数是是否自动删除
- 第四个参数是队列是否可以被其他队列访问
- 第五个参数是设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
三、简单模式
-
1、根据官网图来看,简单模式是不需要交换机的
-
2、定义生产者,向队列中发送消息(注意要先创建队列)
func main() { /** 第一个参数是交换机名称 第二个参数是队列名称 第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃 第四个参数是 路由的时候 第五个参数是消息体 */ err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{ Body: []byte("hello word"), }) fmt.Println(err) }
-
3、查看可是界面是否存在一条消息
-
4、创建消费者,来获取消息内容
/** 第一个参数是队列名称 第二个参数自己给当前消费者命名 第三个参数是否自动应答 第三个参数队列是否可以被其他队列访问 第四个参数 第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度 */ msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil) fmt.Println(err) for msg := range msgChan { fmt.Println(string(msg.Body)) }
四、工作模式
-
1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息
-
2、定义2个消费者来消费消息
func main() { msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil) fmt.Println(err) for msg := range msgChan { fmt.Println("消费者1:", string(msg.Body)) } }
-
3、生产多条消息
func main() { for i := 0; i < 10; i++ { _ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{ Body: []byte(fmt.Sprintf("hello word %d", i)), }) } }
-
4、消费结果
五、发布订阅模式
-
1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息
-
2、使用
go
的api
来创建交换机和队列func main() { // 1.创建2个队列 queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil) queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil) // 2.创建一个交换机 _ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil) // 3.队列和交换机绑定在一起 _ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil) _ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil) }
-
3、消费者只需要绑定队列来消费消息就可以
func main() { msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil) fmt.Println(err) for msg := range msgChan { fmt.Println("消费者1:", string(msg.Body)) } }
-
4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息
func main() { _ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{ Body: []byte("hello word"), }) }
-
5、可以查看控制台两个消费者都接收到消息
六、路由模式
-
1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由
key
-
2、使用
go-api
创建交换机和队列,并且对其绑定func main() { // 1.创建2个队列 queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil) queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil) // 2.创建一个交换机 err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil) if err != nil { fmt.Println(err) } // 3.队列和交换机绑定在一起 _ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil) _ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil) _ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil) }
-
3、定义消费者
func main() { msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil) fmt.Println(err) for msg := range msgChan { fmt.Println("消费者1:", string(msg.Body)) } }
-
4、定义生产者
func main() { // 消费者会根据绑定的路由key来获取消息 _ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{ Body: []byte("hello word"), }) }
七、主题模式
- 1、主题模式和上面路由模式差不多,只是多了一个模糊匹配
*
表示只匹配一个单词#
表示匹配多个单词
八、简单对其封装
-
1、封装代码
package utils import ( "errors" "fmt" "github.com/streadway/amqp" "log" ) // MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost const MQURL = "amqp://admin:123456@localhost:5672//" type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel MQUrl string } // NewRabbitMQ 创建RabbitMQ的结构体实例 func NewRabbitMQ() *RabbitMQ { rabbitMQ := &RabbitMQ{ MQUrl: MQURL, } var err error // 创建rabbitMQ连接 rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl) if err != nil { rabbitMQ.failOnErr(err, "创建连接错误") } rabbitMQ.channel, err = rabbitMQ.conn.Channel() if err != nil { rabbitMQ.failOnErr(err, "获取channel失败") } return rabbitMQ } // Binding 创建交换机和队列并且绑定在一起 func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) { // 1.创建1个队列 queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil) if err != nil { r.failOnErr(err, "创建队列失败") } if exchange != "" && key == "" { r.failOnErr(errors.New("错误"), "请传递交换机链接类型") } if exchange != "" { // 2.创建一个交换机 err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil) if err1 != nil { r.failOnErr(err, "创建交换机失败") } // 3.队列和交换机绑定在一起 if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil { fmt.Println("1111") r.failOnErr(err, "交换机和队列绑定失败") } } fmt.Println("创建成功") } // failOnErr 定义内部错误处理 func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { log.Fatalf("%s:%s", message, err) panic(fmt.Sprintf("%s:%s", message, err)) } } func (r *RabbitMQ) Close() { defer func(Conn *amqp.Connection) { err := Conn.Close() if err != nil { r.failOnErr(err, "关闭链接失败") } }(r.conn) defer func(Channel *amqp.Channel) { err := Channel.Close() if err != nil { r.failOnErr(err, "关闭通道失败") } }(r.channel) } func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) { err := r.channel.Qos(prefetchCount, prefetchSize, global) if err != nil { r.failOnErr(err, "限流失败") } } // Publish 发布者 func (r *RabbitMQ) Publish(exchange, routerKey, message string) { // 2.发送数据到队列中 if err := r.channel.Publish( exchange, routerKey, false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者 false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者 amqp.Publishing{ Body: []byte(message), }, ); err != nil { r.failOnErr(err, "发送消息失败") } fmt.Println("恭喜你,消息发送成功") } // Consumer 消费者 func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) { // 2.接收消息 message, err := r.channel.Consume( queueName, "", // 区分多个消费者 true, // 是否自动应答 false, // 是否具有排他性 false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者 false, // 队列消费是否阻塞 nil, ) if err != nil { r.failOnErr(err, "接收消息失败") } fmt.Println("消费者等待消费...") forever := make(chan bool) // 使用协程处理消息 go func() { for d := range message { log.Printf("接收到的消息:%s", d.Body) callback(d.Body) } }() <-forever }
-
2、简单模式的使用
func main() { mq := utils.NewRabbitMQ() mq.Consumer("simple_queue1", func(message []byte) { fmt.Println(string(message)) }) defer mq.Close() } func main() { mq := utils.NewRabbitMQ() mq.Binding("simple_queue1", "", "", "") defer mq.Close() mq.Publish("", "simple_queue1", "你好水痕") }
-
3、工作模式的使用
func main() { mq := utils.NewRabbitMQ() mq.Consumer("work_queue1", func(message []byte) { fmt.Println("消费者2", string(message)) }) defer mq.Close() } func main() { mq := utils.NewRabbitMQ() defer mq.Close() for i := 0; i < 10; i++ { mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i)) } }
-
4、交换机带路由的时候
func main() { mq := utils.NewRabbitMQ() mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info") mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error") mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info") defer mq.Close() } func main() { mq := utils.NewRabbitMQ() mq.Consumer("first_queue2", func(message []byte) { fmt.Println("消费者2", string(message)) }) defer mq.Close() } func main() { mq := utils.NewRabbitMQ() defer mq.Close() mq.Publish("first_exchange1", "error", "你好水痕") }