go rabbitmq 操作
go rabbitmq 操作
go 依赖包github.com/streadway/amqp
docker快速部署
docker pull rabbitmq:management
docker run -d rabbitmq:management # 先跑一个看看监听了哪些端口
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq #5672 go 程序连接,15672是管理页面
写个最基本生产者消费者demo(headers 模式)
package test
import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"testing"
"time"
"github.com/streadway/amqp"
)
var (
obj *MQOBJ
)
type MQOBJ struct {
*amqp.Connection
*amqp.Channel
}
func (mq *MQOBJ) Close() error {
mq.Connection.Close()
mq.Channel.Close()
return nil
}
func init() {
var mqurl = "amqp://cho:123@192.168.101.7:5672"
con, err := amqp.Dial(mqurl)
if err != nil {
log.Fatalln(err)
}
ch, err := con.Channel()
if err != nil {
log.Fatalln(err)
}
obj = &MQOBJ{Connection: con, Channel: ch}
}
func producer() {
_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
if err != nil {
return
}
err = obj.ExchangeDeclare("go-test-exchange2", amqp.ExchangeHeaders, true, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
//这个queue绑定,你也可以放消费者那边绑定,更灵活
err = obj.Channel.QueueBind("go-test2", "go-test2", "go-test-exchange2", false, amqp.Table{"name": "jesko"})
if err != nil {
log.Fatalln(err)
}
ticker := time.NewTicker(time.Millisecond * 300)
var i int
for {
select {
case <-ticker.C:
err = obj.Publish("", "go-test2", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain", Headers: amqp.Table{"x-match": "any", "name": "jesko", "age": 22}})
if err != nil {
log.Fatalln(err)
}
i++
}
}
}
func customer() {
_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
msgch, err := obj.Channel.Consume("go-test2", "", true, false, true, false, nil)
if err != nil {
log.Fatalln(err)
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case msg := <-msgch:
fmt.Println("accept msg " + string(msg.Body))
case <-ch:
return
}
}
}
func TestAmqp(t *testing.T) {
defer obj.Close()
go func() {
producer()
}()
time.Sleep(2 * time.Second)
customer()
}
这里可以看到我们创建的queue
topic模式
topic模式不用绑定headers去匹配
package test
import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"testing"
"time"
"github.com/streadway/amqp"
)
var (
obj *MQOBJ
logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)
func Fataln(a ...any) {
logger.Println(a...)
os.Exit(0)
}
type MQOBJ struct {
*amqp.Connection
*amqp.Channel
}
func (mq *MQOBJ) Close() error {
mq.Connection.Close()
mq.Channel.Close()
return nil
}
func init() {
var mqurl = "amqp://cho:123@192.168.101.7:5672"
con, err := amqp.Dial(mqurl)
if err != nil {
Fataln(err)
}
ch, err := con.Channel()
if err != nil {
Fataln(err)
}
obj = &MQOBJ{Connection: con, Channel: ch}
fmt.Println("init success")
}
func producer() {
err := obj.ExchangeDeclare("go-test-exchange", amqp.ExchangeTopic, true, false, false, false, nil)
if err != nil {
Fataln(err)
}
ticker := time.NewTicker(time.Millisecond * 300)
var i int
for {
select {
case <-ticker.C:
err = obj.Publish("go-test-exchange", "go-test", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
if err != nil {
Fataln(err)
}
i++
}
}
}
type Empty struct{}
func customer(name string, stopchan <-chan Empty) {
ch, err := obj.Connection.Channel()
if err != nil {
Fataln(err)
}
defer ch.Close()
_, err = ch.QueueDeclare(name, true, false, false, false, nil)
if err != nil {
Fataln("queue declare failed", err)
}
err = ch.QueueBind(name, name, "go-test-exchange", false, nil)
if err != nil {
fmt.Fprintln(os.Stderr, "queue bind failed", err)
return
}
msgch, err := ch.Consume(name, "", true, false, true, false, nil)
if err != nil {
Fataln("consume failed", err)
}
for {
select {
case msg := <-msgch:
fmt.Println("accept msg " + name + " " + string(msg.Body))
case <-stopchan:
return
}
}
}
func TestAmqp(t *testing.T) {
defer obj.Close()
go func() {
producer()
}()
time.Sleep(2 * time.Second)
stopchanlist := make([]chan Empty, 2)
stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
for _, c := range stopchanlist {
c <- Empty{}
}
}()
go customer("go-test", stopchanlist[0])
customer("go-test2", stopchanlist[1])
}
go-test 有信息(topic 匹配),go-test2没信息(topic未匹配)。
direct模式
package test
import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"testing"
"time"
"github.com/streadway/amqp"
)
var (
obj *MQOBJ
logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)
func Fataln(a ...any) {
logger.Println(a...)
os.Exit(0)
}
type MQOBJ struct {
*amqp.Connection
*amqp.Channel
}
func (mq *MQOBJ) Close() error {
mq.Connection.Close()
mq.Channel.Close()
return nil
}
func init() {
var mqurl = "amqp://cho:123@192.168.101.7:5672"
con, err := amqp.Dial(mqurl)
if err != nil {
Fataln(err)
}
ch, err := con.Channel()
if err != nil {
Fataln(err)
}
obj = &MQOBJ{Connection: con, Channel: ch}
fmt.Println("init success")
}
func producer() {
err := obj.ExchangeDeclare("go-test-exchange3", amqp.ExchangeDirect, true, false, false, false, nil)
if err != nil {
Fataln(err)
}
ticker := time.NewTicker(time.Millisecond * 300)
var i int
for {
select {
case <-ticker.C:
err = obj.Publish("go-test-exchange3", "", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
if err != nil {
Fataln(err)
}
i++
}
}
}
type Empty struct{}
func customer(name string, stopchan <-chan Empty) {
ch, err := obj.Connection.Channel()
if err != nil {
Fataln(err)
}
defer ch.Close()
_, err = ch.QueueDeclare(name, true, false, false, false, nil)
if err != nil {
Fataln("queue declare failed", err)
}
err = ch.QueueBind(name, "", "go-test-exchange3", false, nil)
if err != nil {
fmt.Fprintln(os.Stderr, "queue bind failed", err)
return
}
msgch, err := ch.Consume(name, "", true, false, true, false, nil)
if err != nil {
Fataln("consume failed", err)
}
for {
select {
case msg := <-msgch:
fmt.Println("accept msg " + name + " " + string(msg.Body))
case <-stopchan:
return
}
}
}
func TestAmqp(t *testing.T) {
defer obj.Close()
go func() {
producer()
}()
time.Sleep(2 * time.Second)
stopchanlist := make([]chan Empty, 2)
stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
for _, c := range stopchanlist {
c <- Empty{}
}
}()
go customer("go-test", stopchanlist[0])
customer("go-test2", stopchanlist[1])
}
demo测试命令
go test -v amqp_test.go