当前位置: 首页 > article >正文

golang实现生产者消费者模式

在Go语言中,生产者消费者模式可以通过使用Goroutines和Channels来实现。Goroutines允许并发执行,而Channels则用于在生产者和消费者之间安全地传递数据。
生产者消费者模式的基本思路
生产者:负责生成数据并将其放入一个共享的缓冲区(Channel)。
消费者:从共享的缓冲区中取出数据并进行处理。
同步:使用Channel来同步生产者和消费者之间的操作,确保数据的安全传递。

建立一个channel

package out

import "fmt"

type Out struct {
	data chan interface{}
}

var out *Out

func NewOut() *Out {
	if out == nil {
		out = &Out{
			data: make(chan interface{}, 65535),
		}
	}
	return out
}
func Println(i interface{}) {
	out.data <- i
}
func (o *Out) OutPut() {
	//for i := range o.data {
	//	fmt.Println(i)
	//	fmt.Println("out put")
	//}
	//fmt.Println("结束")
	for {
		select {
		case i := <-o.data:
			fmt.Println(i)
		}
	}
}

一对一

package one_one

import (
	"producer-consumer/out"
	"sync"
)

type Task struct {
	ID int64
}

func (t *Task) run() {
	out.Println(t.ID)
}

var taskCh = make(chan Task, 10)

const taskNum int64 = 10000

func producer(wo chan<- Task) {
	var i int64
	for i = 1; i <= taskNum; i++ {
		t := Task{
			ID: i,
		}
		wo <- t
	}
	close(wo)
}
func consumer(ro <-chan Task) {
	for t := range ro {
		if t.ID != 0 {
			t.run()
		}
	}
}
func Exec() {
	wg := &sync.WaitGroup{}
	wg.Add(2)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()
		producer(taskCh)
	}(wg)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()
		consumer(taskCh)
	}(wg)
	wg.Wait()

	out.Println("执行成功")
}

一对多

package one_many

import (
	"producer-consumer/out"
	"sync"
)

type Task struct {
	ID int64
}

func (t *Task) run() {
	out.Println(t.ID)
}

var taskCh = make(chan Task, 10)

const taskNum int64 = 10000

func producer(wo chan<- Task) {
	var i int64
	for i = 1; i <= taskNum; i++ {
		t := Task{
			ID: i,
		}
		wo <- t
	}
	close(wo)
}
func consumer(ro <-chan Task) {
	for t := range ro {
		if t.ID != 0 {
			t.run()
		}
	}
}
func Exec() {
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()
		producer(taskCh)
	}(wg)
	var i int64
	for i = 0; i < taskNum; i++ {
		if i%100 == 0 {
			wg.Add(1)
			go func(wg *sync.WaitGroup) {
				defer wg.Done()
				consumer(taskCh)
			}(wg)
		}
	}
	wg.Wait()

	out.Println("执行成功")
}

多对一

package many_one

import (
	"producer-consumer/out"
	"sync"
)

type Task struct {
	ID int64
}

func (t *Task) run() {
	out.Println(t.ID)
}

var taskCh = make(chan Task, 10)

const taskNum int64 = 10000
const nums int64 = 100

func producer(wo chan<- Task, startNum int64, nums int64) {
	var i int64
	for i = startNum; i < taskNum+nums; i++ {
		t := Task{
			ID: i,
		}
		wo <- t
	}
}
func consumer(ro <-chan Task) {
	for t := range ro {
		if t.ID != 0 {
			t.run()
		}
	}
}
func Exec() {
	wg := &sync.WaitGroup{}
	pwg := &sync.WaitGroup{}
	var i int64
	for i = 0; i < taskNum; i += nums {
		if i >= taskNum {
			break
		}
		wg.Add(1)
		pwg.Add(1)
		go func(i int64) {
			defer wg.Done()
			defer pwg.Done()
			producer(taskCh, i, nums)
		}(i)
	}
	wg.Add(1)
	go func() {
		defer wg.Done()
		consumer(taskCh)
	}()
	pwg.Wait()
	go close(taskCh)
	wg.Wait()
	out.Println("执行成功")
}

多对多

package many_many

import (
	"fmt"
	"producer-consumer/out"
	"time"
)

type Task struct {
	ID int64
}

func (t *Task) run() {
	out.Println(t.ID)
}

var taskCh = make(chan Task, 10)
var done = make(chan struct{})

const taskNum int64 = 10000

func producer(wo chan<- Task, done chan struct{}) {
	var i int64
	for {
		if i >= taskNum {
			i = 0
		}
		i++
		t := Task{
			ID: i,
		}
		select {
		case wo <- t:
		case <-done:
			out.Println("生产者退出")
			return
		}
	}
}
func consumer(ro <-chan Task, done chan struct{}) {
	for {
		select {
		case t := <-ro:
			if t.ID != 0 {
				t.run()
			}
		case <-done:
			for t := range ro {
				if t.ID != 0 {
					t.run()
				}
			}
			out.Println("消费者退出")
			return
		}
	}
}
func Exec() {
	go producer(taskCh, done)
	go producer(taskCh, done)
	go producer(taskCh, done)
	go producer(taskCh, done)
	go producer(taskCh, done)
	go producer(taskCh, done)

	go consumer(taskCh, done)
	go consumer(taskCh, done)
	time.Sleep(time.Second * 5)
	close(done)
	close(taskCh)
	time.Sleep(time.Second * 5)
	fmt.Println(len(taskCh))
}

主程序入口

package main

func main() {
	//o := out.NewOut()
	//go o.OutPut()
	//one_one.Exec()
	//one_many.Exec()
	//many_one.Exec()
	//many_many.Exec()
	//sig := make(chan os.Signal)
	//signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	//<-sig
}


http://www.kler.cn/a/454553.html

相关文章:

  • 随机变量是一个函数-如何理解
  • 【MySQL】踩坑笔记——保存带有换行符等特殊字符的数据,需要进行转义保存
  • 算法题(17):删除有序数组中的重复项
  • k8s coredns
  • 简单发布一个npm包
  • Ubuntu 24.04.1 LTS 配置静态固定IP地址
  • 计算机专业文献检索期末论文
  • 计算机网络——期末复习(3)4-6章考试重点
  • 零基础微信小程序开发——页面导航之编程式导航(保姆级教程+超详细)
  • 爬虫数据存储:Redis、MySQL 与 MongoDB 的对比与实践
  • 007-利用切面计算方法耗时
  • vue中el-select选择框带搜索和输入,根据用户输入的值显示下拉列表
  • R语言的数据类型
  • 随手笔记【六】
  • TDesign:Tabs 选项卡
  • Boost之log日志使用
  • Elasticsearch安装和数据迁移
  • [微服务]elasticsearc客服端操作
  • 【从零开始入门unity游戏开发之——C#篇33】C#委托(`Delegate`)和事件(`event` )、事件与委托的区别、Invoke()的解释
  • Spring Boot的开发工具(DevTools)模块中的热更新特性导致的问题