golang实现生产者消费者模式
在Go语言中,生产者消费者模式可以通过使用Goroutines和Channels来实现。Goroutines允许并发执行,而Channels则用于在生产者和消费者之间安全地传递数据。
生产者消费者模式的基本思路
生产者:负责生成数据并将其放入一个共享的缓冲区(Channel)。
消费者:从共享的缓冲区中取出数据并进行处理。
同步:使用Channel来同步生产者和消费者之间的操作,确保数据的安全传递。
建立一个channel
package out
import "fmt"
type Out struct {
data chan interface{}
}
var out *Out
func NewOut() *Out {
if out == nil {
out = &Out{
data: make(chan interface{}, 65535),
}
}
return out
}
func Println(i interface{}) {
out.data <- i
}
func (o *Out) OutPut() {
//for i := range o.data {
// fmt.Println(i)
// fmt.Println("out put")
//}
//fmt.Println("结束")
for {
select {
case i := <-o.data:
fmt.Println(i)
}
}
}
一对一
package one_one
import (
"producer-consumer/out"
"sync"
)
type Task struct {
ID int64
}
func (t *Task) run() {
out.Println(t.ID)
}
var taskCh = make(chan Task, 10)
const taskNum int64 = 10000
func producer(wo chan<- Task) {
var i int64
for i = 1; i <= taskNum; i++ {
t := Task{
ID: i,
}
wo <- t
}
close(wo)
}
func consumer(ro <-chan Task) {
for t := range ro {
if t.ID != 0 {
t.run()
}
}
}
func Exec() {
wg := &sync.WaitGroup{}
wg.Add(2)
go func(wg *sync.WaitGroup) {
defer wg.Done()
producer(taskCh)
}(wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
consumer(taskCh)
}(wg)
wg.Wait()
out.Println("执行成功")
}
一对多
package one_many
import (
"producer-consumer/out"
"sync"
)
type Task struct {
ID int64
}
func (t *Task) run() {
out.Println(t.ID)
}
var taskCh = make(chan Task, 10)
const taskNum int64 = 10000
func producer(wo chan<- Task) {
var i int64
for i = 1; i <= taskNum; i++ {
t := Task{
ID: i,
}
wo <- t
}
close(wo)
}
func consumer(ro <-chan Task) {
for t := range ro {
if t.ID != 0 {
t.run()
}
}
}
func Exec() {
wg := &sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
producer(taskCh)
}(wg)
var i int64
for i = 0; i < taskNum; i++ {
if i%100 == 0 {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
consumer(taskCh)
}(wg)
}
}
wg.Wait()
out.Println("执行成功")
}
多对一
package many_one
import (
"producer-consumer/out"
"sync"
)
type Task struct {
ID int64
}
func (t *Task) run() {
out.Println(t.ID)
}
var taskCh = make(chan Task, 10)
const taskNum int64 = 10000
const nums int64 = 100
func producer(wo chan<- Task, startNum int64, nums int64) {
var i int64
for i = startNum; i < taskNum+nums; i++ {
t := Task{
ID: i,
}
wo <- t
}
}
func consumer(ro <-chan Task) {
for t := range ro {
if t.ID != 0 {
t.run()
}
}
}
func Exec() {
wg := &sync.WaitGroup{}
pwg := &sync.WaitGroup{}
var i int64
for i = 0; i < taskNum; i += nums {
if i >= taskNum {
break
}
wg.Add(1)
pwg.Add(1)
go func(i int64) {
defer wg.Done()
defer pwg.Done()
producer(taskCh, i, nums)
}(i)
}
wg.Add(1)
go func() {
defer wg.Done()
consumer(taskCh)
}()
pwg.Wait()
go close(taskCh)
wg.Wait()
out.Println("执行成功")
}
多对多
package many_many
import (
"fmt"
"producer-consumer/out"
"time"
)
type Task struct {
ID int64
}
func (t *Task) run() {
out.Println(t.ID)
}
var taskCh = make(chan Task, 10)
var done = make(chan struct{})
const taskNum int64 = 10000
func producer(wo chan<- Task, done chan struct{}) {
var i int64
for {
if i >= taskNum {
i = 0
}
i++
t := Task{
ID: i,
}
select {
case wo <- t:
case <-done:
out.Println("生产者退出")
return
}
}
}
func consumer(ro <-chan Task, done chan struct{}) {
for {
select {
case t := <-ro:
if t.ID != 0 {
t.run()
}
case <-done:
for t := range ro {
if t.ID != 0 {
t.run()
}
}
out.Println("消费者退出")
return
}
}
}
func Exec() {
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go producer(taskCh, done)
go consumer(taskCh, done)
go consumer(taskCh, done)
time.Sleep(time.Second * 5)
close(done)
close(taskCh)
time.Sleep(time.Second * 5)
fmt.Println(len(taskCh))
}
主程序入口
package main
func main() {
//o := out.NewOut()
//go o.OutPut()
//one_one.Exec()
//one_many.Exec()
//many_one.Exec()
//many_many.Exec()
//sig := make(chan os.Signal)
//signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
//<-sig
}