当前位置: 首页 > article >正文

go rabbitmq 操作

go rabbitmq 操作

go 依赖包github.com/streadway/amqp

docker快速部署

docker pull rabbitmq:management
docker run -d rabbitmq:management # 先跑一个看看监听了哪些端口
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq #5672 go 程序连接,15672是管理页面

写个最基本生产者消费者demo(headers 模式)

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj *MQOBJ
)

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		log.Fatalln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		log.Fatalln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}

}
func producer() {
	_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
	if err != nil {
		return
	}
	err = obj.ExchangeDeclare("go-test-exchange2", amqp.ExchangeHeaders, true, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	//这个queue绑定,你也可以放消费者那边绑定,更灵活
	err = obj.Channel.QueueBind("go-test2", "go-test2", "go-test-exchange2", false, amqp.Table{"name": "jesko"})
	if err != nil {
		log.Fatalln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("", "go-test2", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain", Headers: amqp.Table{"x-match": "any", "name": "jesko", "age": 22}})
			if err != nil {
				log.Fatalln(err)
			}
			i++
		}
	}

}
func customer() {
	_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	msgch, err := obj.Channel.Consume("go-test2", "", true, false, true, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + string(msg.Body))
		case <-ch:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	customer()
}

请添加图片描述
这里可以看到我们创建的queue
请添加图片描述

topic模式

topic模式不用绑定headers去匹配

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj    *MQOBJ
	logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)

func Fataln(a ...any) {
	logger.Println(a...)
	os.Exit(0)
}

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		Fataln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		Fataln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}
	fmt.Println("init success")

}
func producer() {
	err := obj.ExchangeDeclare("go-test-exchange", amqp.ExchangeTopic, true, false, false, false, nil)
	if err != nil {
		Fataln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("go-test-exchange", "go-test", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
			if err != nil {
				Fataln(err)
			}
			i++
		}
	}

}

type Empty struct{}

func customer(name string, stopchan <-chan Empty) {
	ch, err := obj.Connection.Channel()
	if err != nil {
		Fataln(err)
	}
	defer ch.Close()
	_, err = ch.QueueDeclare(name, true, false, false, false, nil)
	if err != nil {
		Fataln("queue declare failed", err)
	}
	err = ch.QueueBind(name, name, "go-test-exchange", false, nil)
	if err != nil {
		fmt.Fprintln(os.Stderr, "queue bind failed", err)
		return
	}
	msgch, err := ch.Consume(name, "", true, false, true, false, nil)
	if err != nil {
		Fataln("consume failed", err)
	}
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + name + " " + string(msg.Body))
		case <-stopchan:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	stopchanlist := make([]chan Empty, 2)
	stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-ch
		for _, c := range stopchanlist {
			c <- Empty{}
		}
	}()
	go customer("go-test", stopchanlist[0])
	customer("go-test2", stopchanlist[1])
}

go-test 有信息(topic 匹配),go-test2没信息(topic未匹配)。

direct模式

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj    *MQOBJ
	logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)

func Fataln(a ...any) {
	logger.Println(a...)
	os.Exit(0)
}

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		Fataln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		Fataln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}
	fmt.Println("init success")

}
func producer() {
	err := obj.ExchangeDeclare("go-test-exchange3", amqp.ExchangeDirect, true, false, false, false, nil)
	if err != nil {
		Fataln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("go-test-exchange3", "", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
			if err != nil {
				Fataln(err)
			}
			i++
		}
	}

}

type Empty struct{}

func customer(name string, stopchan <-chan Empty) {
	ch, err := obj.Connection.Channel()
	if err != nil {
		Fataln(err)
	}
	defer ch.Close()
	_, err = ch.QueueDeclare(name, true, false, false, false, nil)
	if err != nil {
		Fataln("queue declare failed", err)
	}
	err = ch.QueueBind(name, "", "go-test-exchange3", false, nil)
	if err != nil {
		fmt.Fprintln(os.Stderr, "queue bind failed", err)
		return
	}
	msgch, err := ch.Consume(name, "", true, false, true, false, nil)
	if err != nil {
		Fataln("consume failed", err)
	}
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + name + " " + string(msg.Body))
		case <-stopchan:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	stopchanlist := make([]chan Empty, 2)
	stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-ch
		for _, c := range stopchanlist {
			c <- Empty{}
		}
	}()
	go customer("go-test", stopchanlist[0])
	customer("go-test2", stopchanlist[1])
}

demo测试命令

go test -v amqp_test.go


http://www.kler.cn/a/272225.html

相关文章:

  • 第五篇 vue3 ref 与 reactive 对比
  • 美特CRM mcc_login.jsp存在SQL注入漏洞
  • kafka学习笔记7 性能测试 —— 筑梦之路
  • 力扣 739. 每日温度
  • 路由器旁挂三层网络实现SDWAN互联(爱快SD-WAN)
  • 每日一刷——1.20——准备蓝桥杯
  • 体系结构安全第二次作业:调研整理编译器优化引入的安全问题,形成调研报告提交
  • Docker学习之数据管理(超详解析)
  • 鸿蒙内核系统
  • IDEA : 已经有一个永久破解版的IDEA2019版本,现在又想安装最新版本的,俩版本共存,发现新版本打不开的解决方案
  • html5cssjs代码 022 表单输入类型示例
  • 高等代数复习:应试经验:求行列式
  • NFT数字藏品推广途径有哪些?CloudNEO免费个性定制方案,推广您的NFT
  • Selenium笔记
  • C语言 数据在内存中的存储
  • elasticsearch(RestHighLevelClient API操作)(黑马)
  • 从零自制docker-4-【PID Namespace MOUNT Namespace】
  • 深入了解Android垃圾回收机制
  • odoo17开发教程(6):用户界面UI的交互-创建Action
  • ffmpeg 切割音频文件,各种格式(wav, flac, mp3, m4a等)
  • lua gc垃圾回收知识记录
  • 如何在MATLAB中处理图像和视频?
  • AJAX-XMLHttpRequest
  • Pytorch NLP入门3:用嵌入表示单词
  • 接口测试及接口测试工具【Postman】相关的面试题
  • 微信小程序Skyline模式自定义tab组件胶囊与原生胶囊平齐,安卓和ios均自适应