当前位置: 首页 > article >正文

go对rabbitmq基本操作

一、安装rabbitmq

  • 1、直接使用docker拉取镜像

    docker pull rabbitmq:3.8
    
  • 2、启动容器

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit01 \
     --hostname rabbit01 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8
    
  • 3、关于端口的介绍

    • 15672的给浏览器控制台使用的
    • 5672是给程序调用的
  • 4、进入到rabbit01容器中

    docker exec -it rabbit01 /bin/bash
    
  • 5、开启可视化界面操作

    rabbitmq-plugins enable rabbitmq_management
    
  • 6、客户端直接访问xx:15672

  • 7、或者直接用别人搞好的镜像

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit02 \
     --hostname rabbit02 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8-management
    

二、go语言对rabbitmq基本操作

  • 1、安装依赖包

    go get github.com/streadway/amqp
    
  • 2、基本的连接操作

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func main() {
    	// 连接rabbitmq
        // conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称")   // 端口号:5672
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	defer conn.Close()
    
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	defer ch.Close()
    }
    
  • 3、由于部分每个地方都要使用,封装成一个方法

    package utils
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func RabbitmqUtils() *amqp.Channel {
    	// 连接rabbitmq
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	//defer conn.Close()
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	//defer ch.Close()
    	return ch
    }
    
  • 4、创建一个队列,然后到可视化界面查看是否自动创建

    func main() {
    	// 创建一个队列
        // durable, autoDelete, exclusive, noWait bool
    	queue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil)
    	fmt.Println(queue.Name, err)
    }
    

    在这里插入图片描述

  • 5、关于创建队列几个参数的介绍

    • 第一个参数是队列名称
    • 第二个参数是队列是否持久化
    • 第三个参数是是否自动删除
    • 第四个参数是队列是否可以被其他队列访问
    • 第五个参数是设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度

三、简单模式

  • 1、根据官网图来看,简单模式是不需要交换机的

    在这里插入图片描述

  • 2、定义生产者,向队列中发送消息(注意要先创建队列)

    func main() {
        /**
    	第一个参数是交换机名称
    	第二个参数是队列名称
    	第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃
    	第四个参数是 路由的时候
    	第五个参数是消息体
    	*/
    	err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    	fmt.Println(err)
    }
    
  • 3、查看可是界面是否存在一条消息

  • 4、创建消费者,来获取消息内容

    /**
    第一个参数是队列名称
    第二个参数自己给当前消费者命名
    第三个参数是否自动应答
    第三个参数队列是否可以被其他队列访问
    第四个参数
    第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
    */
    msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil)
    fmt.Println(err)
    for msg := range msgChan {
        fmt.Println(string(msg.Body))
    }
    

四、工作模式

  • 1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息

  • 2、定义2个消费者来消费消息

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 3、生产多条消息

    func main() {
    	for i := 0; i < 10; i++ {
    		_ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{
    			Body: []byte(fmt.Sprintf("hello word %d", i)),
    		})
    	}
    }
    
  • 4、消费结果

    在这里插入图片描述

五、发布订阅模式

  • 1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息

  • 2、使用goapi来创建交换机和队列

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil)
    	// 2.创建一个交换机
    	_ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil)
    }
    
  • 3、消费者只需要绑定队列来消费消息就可以

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息

    func main() {
    	_ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    
  • 5、可以查看控制台两个消费者都接收到消息

六、路由模式

  • 1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由key

  • 2、使用go-api创建交换机和队列,并且对其绑定

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil)
    	// 2.创建一个交换机
    	err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	if err != nil {
    		fmt.Println(err)
    	}
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil)
    }
    
  • 3、定义消费者

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、定义生产者

    func main() {
        // 消费者会根据绑定的路由key来获取消息
    	_ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    

七、主题模式

  • 1、主题模式和上面路由模式差不多,只是多了一个模糊匹配
    • *表示只匹配一个单词
    • #表示匹配多个单词

八、简单对其封装

  • 1、封装代码

    package utils
    
    import (
    	"errors"
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    // MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
    const MQURL = "amqp://admin:123456@localhost:5672//"
    
    type RabbitMQ struct {
    	conn    *amqp.Connection
    	channel *amqp.Channel
    	MQUrl   string
    }
    
    // NewRabbitMQ 创建RabbitMQ的结构体实例
    func NewRabbitMQ() *RabbitMQ {
    	rabbitMQ := &RabbitMQ{
    		MQUrl: MQURL,
    	}
    	var err error
    	// 创建rabbitMQ连接
    	rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
    	if err != nil {
    		rabbitMQ.failOnErr(err, "创建连接错误")
    	}
    	rabbitMQ.channel, err = rabbitMQ.conn.Channel()
    	if err != nil {
    		rabbitMQ.failOnErr(err, "获取channel失败")
    	}
    	return rabbitMQ
    }
    
    // Binding 创建交换机和队列并且绑定在一起
    func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) {
    	// 1.创建1个队列
    	queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil)
    	if err != nil {
    		r.failOnErr(err, "创建队列失败")
    	}
    	if exchange != "" && key == "" {
    		r.failOnErr(errors.New("错误"), "请传递交换机链接类型")
    	}
    	if exchange != "" {
    		// 2.创建一个交换机
    		err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil)
    		if err1 != nil {
    			r.failOnErr(err, "创建交换机失败")
    		}
    		// 3.队列和交换机绑定在一起
    		if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil {
    			fmt.Println("1111")
    			r.failOnErr(err, "交换机和队列绑定失败")
    		}
    	}
    	fmt.Println("创建成功")
    }
    
    // failOnErr 定义内部错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {
    	if err != nil {
    		log.Fatalf("%s:%s", message, err)
    		panic(fmt.Sprintf("%s:%s", message, err))
    	}
    }
    
    func (r *RabbitMQ) Close() {
    	defer func(Conn *amqp.Connection) {
    		err := Conn.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭链接失败")
    		}
    	}(r.conn)
    	defer func(Channel *amqp.Channel) {
    		err := Channel.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭通道失败")
    		}
    	}(r.channel)
    }
    
    func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) {
    	err := r.channel.Qos(prefetchCount, prefetchSize, global)
    	if err != nil {
    		r.failOnErr(err, "限流失败")
    	}
    }
    
    // Publish 发布者
    func (r *RabbitMQ) Publish(exchange, routerKey, message string) {
    	// 2.发送数据到队列中
    	if err := r.channel.Publish(
    		exchange,
    		routerKey,
    		false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者
    		false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者
    		amqp.Publishing{
    			Body: []byte(message),
    		},
    	); err != nil {
    		r.failOnErr(err, "发送消息失败")
    	}
    	fmt.Println("恭喜你,消息发送成功")
    }
    
    // Consumer 消费者
    func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) {
    	// 2.接收消息
    	message, err := r.channel.Consume(
    		queueName,
    		"",    // 区分多个消费者
    		true,  // 是否自动应答
    		false, // 是否具有排他性
    		false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者
    		false, // 队列消费是否阻塞
    		nil,
    	)
    	if err != nil {
    		r.failOnErr(err, "接收消息失败")
    	}
    	fmt.Println("消费者等待消费...")
    	forever := make(chan bool)
    	// 使用协程处理消息
    	go func() {
    		for d := range message {
    			log.Printf("接收到的消息:%s", d.Body)
    			callback(d.Body)
    		}
    	}()
    	<-forever
    }
    
  • 2、简单模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("simple_queue1", func(message []byte) {
    		fmt.Println(string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("simple_queue1", "", "", "")
    	defer mq.Close()
    	mq.Publish("", "simple_queue1", "你好水痕")
    }
    
  • 3、工作模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("work_queue1", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	for i := 0; i < 10; i++ {
    		mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i))
    	}
    }
    
  • 4、交换机带路由的时候

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info")
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error")
    	mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info")
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("first_queue2", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	mq.Publish("first_exchange1", "error", "你好水痕")
    }
    

http://www.kler.cn/a/146130.html

相关文章:

  • 【前端动效】HTML + CSS 实现打字机效果
  • Android系统开发(一):AOSP 架构全解析:开源拥抱安卓未来
  • SpringBoot错误码国际化
  • 在.NET用C#将Word文档转换为HTML格式
  • 【postgres】sqlite格式如何导入postgres数据库
  • RPA编程实践:Electron简介
  • Redis 两种持久化方式 AOF 和 RDB
  • 巧妙之中见真章:深入解析常用的创建型设计模式
  • vatee万腾的科技征途:Vatee数字化力量的新视野
  • ICMPv6报文与邻居状态跟踪
  • 什么样的CRM系统更值得使用?
  • 音视频5、libavformat-3
  • C# List<T>的综合用法
  • Linux(9):正规表示法与文件格式化处理
  • 一种模板类实现和声明分开在生成的.a文件被使用时出现undefined reference时的一种解决方法
  • 【设计模式】行为型模式-第 3 章第 6 讲【中介者模式】
  • react高阶成分(HOC)
  • YOLOv8改进 | SAConv可切换空洞卷积(附修改后的C2f+Bottleneck)
  • 面试笔记--Linux常用命令
  • 网络安全—自学
  • 基于单片机DHT11湿度测量与控制-CO2-光照报警系统程序和仿真
  • 终于等到你!常用的组织架构图模板,高清图片一键导出
  • 在vue页面中添加组件到底有多方便
  • udp通信socket关闭后,缓存不清空
  • 【Android知识笔记】性能优化专题(三)
  • [环境配置]vscode免密ssh的设置流程