当前位置: 首页 > article >正文

使用Golang实现开发中常用的【并发设计模式】

使用Golang实现开发中常用的【并发设计模式】

设计模式是解决常见问题的模板,可以帮助我们提升思维能力,编写更高效、可维护性更强的代码

屏障模式

未来模式

管道模式

协程池模式

发布订阅模式

下面是使用 Go 语言实现屏障模式、未来模式、管道模式、协程池模式和发布订阅模式的示例代码。

1. 屏障模式

屏障模式(Barrier Pattern)用于同步多个 goroutine,在所有 goroutine 都到达某个点之前,任何 goroutine 都不能继续执行。

package main

import (
	"fmt"
	"sync"
)

func main() {
	const numWorkers = 5
	var wg sync.WaitGroup
	wg.Add(numWorkers)

	// 创建一个 Barrier
	barrier := make(chan struct{}, numWorkers)

	for i := 0; i < numWorkers; i++ {
		go func(id int) {
			defer wg.Done()

			// 模拟工作
			fmt.Printf("Worker %d is working...\n", id)
			// 到达屏障点
			barrier <- struct{}{}

			// 等待所有 goroutine 到达屏障点
			<-barrier

			// 继续工作
			fmt.Printf("Worker %d continues after barrier.\n", id)
		}(i)
	}

	wg.Wait()
	close(barrier)
}

2. 未来模式

未来模式(Future Pattern)用于异步获取结果,通常用于长时间运行的任务。

package main

import (
	"fmt"
	"time"
)

type Future interface {
	Get() (string, error)
}

type futureImpl struct {
	resultChan chan string
	errChan    chan error
}

func NewFuture() Future {
	f := &futureImpl{
		resultChan: make(chan string, 1),
		errChan:    make(chan error, 1),
	}
	go f.run()
	return f
}

func (f *futureImpl) run() {
	// 模拟长时间运行的任务
	time.Sleep(2 * time.Second)
	select {
	case f.resultChan <- "Result":
	case f.errChan <- nil:
	}
}

func (f *futureImpl) Get() (string, error) {
	select {
	case result := <-f.resultChan:
		return result, nil
	case err := <-f.errChan:
		return "", err
	}
}

func main() {
	future := NewFuture()
	fmt.Println("Doing other work...")
	result, err := future.Get()
	if err != nil {
		fmt.Println("Error:", err)
	} else {
		fmt.Println("Result:", result)
	}
}

3. 管道模式

管道模式(Pipeline Pattern)用于将多个 goroutine 串联起来,形成一个数据处理流水线。

package main

import (
	"fmt"
	"time"
)

func produce(in chan<- int) {
	for i := 0; i < 10; i++ {
		in <- i
		time.Sleep(100 * time.Millisecond)
	}
	close(in)
}

func process(out <-chan int, in chan<- int) {
	for v := range out {
		v *= 2
		in <- v
	}
	close(in)
}

func consume(in <-chan int) {
	for v := range in {
		fmt.Println("Consumed:", v)
	}
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)

	go produce(ch1)
	go process(ch1, ch2)
	go consume(ch2)

	time.Sleep(2 * time.Second)
}

4. 协程池模式

协程池模式(Coroutine Pool Pattern)用于管理和重用一组固定数量的 goroutine。

package main

import (
	"fmt"
	"sync"
	"time"
)

type Task func(int)

type GoroutinePool struct {
	size    int
	tasks   chan Task
	wg      sync.WaitGroup
	stop    bool
	stopCh  chan struct{}
	doneCh  chan struct{}
}

func NewGoroutinePool(size int) *GoroutinePool {
	return &GoroutinePool{
		size:   size,
		tasks:  make(chan Task),
		stopCh: make(chan struct{}),
		doneCh: make(chan struct{}),
	}
}

func (p *GoroutinePool) Start() {
	for i := 0; i < p.size; i++ {
		p.wg.Add(1)
		go p.worker(i)
	}
}

func (p *GoroutinePool) worker(id int) {
	defer p.wg.Done()
	for {
		select {
		case task := <-p.tasks:
			task(id)
		case <-p.stopCh:
			p.doneCh <- struct{}{}
			return
		}
	}
}

func (p *GoroutinePool) Submit(task Task) {
	p.tasks <- task
}

func (p *GoroutinePool) Stop() {
	p.stop = true
	close(p.stopCh)
	p.wg.Wait()
	close(p.doneCh)
}

func main() {
	pool := NewGoroutinePool(5)
	pool.Start()

	for i := 0; i < 20; i++ {
		task := func(id int) {
			fmt.Printf("Task %d handled by worker %d\n", i, id)
			time.Sleep(100 * time.Millisecond)
		}
		pool.Submit(task)
	}

	time.Sleep(1 * time.Second)
	pool.Stop()
}

5. 发布订阅模式

发布订阅模式(Publish-Subscribe Pattern)用于解耦消息的发送者和接收者。

package main

import (
	"fmt"
	"sync"
)

type Event string

type Subscriber func(Event)

type Publisher struct {
	mu         sync.Mutex
	subscribers map[Subscriber]struct{}
}

func NewPublisher() *Publisher {
	return &Publisher{
		subscribers: make(map[Subscriber]struct{}),
	}
}

func (p *Publisher) Subscribe(subscriber Subscriber) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.subscribers[subscriber] = struct{}{}
}

func (p *Publisher) Unsubscribe(subscriber Subscriber) {
	p.mu.Lock()
	defer p.mu.Unlock()
	delete(p.subscribers, subscriber)
}

func (p *Publisher) Publish(event Event) {
	p.mu.Lock()
	defer p.mu.Unlock()
	for sub := range p.subscribers {
		go sub(event)
	}
}

func main() {
	pub := NewPublisher()

	subscriber1 := func(event Event) {
		fmt.Printf("Subscriber 1 received: %s\n", event)
	}
	subscriber2 := func(event Event) {
		fmt.Printf("Subscriber 2 received: %s\n", event)
	}

	pub.Subscribe(subscriber1)
	pub.Subscribe(subscriber2)

	pub.Publish("Event 1")
	pub.Publish("Event 2")

	pub.Unsubscribe(subscriber1)
	pub.Publish("Event 3")
}

总结

  1. 屏障模式:使用通道和 sync.WaitGroup 同步多个 goroutine。
  2. 未来模式:使用通道异步获取任务结果。
  3. 管道模式:使用通道连接多个 goroutine 形成数据处理流水线。
  4. 协程池模式:管理一组固定数量的 goroutine,重用这些 goroutine 处理任务。
  5. 发布订阅模式:解耦消息的发送者和接收者,使用通道广播消息。

http://www.kler.cn/a/382834.html

相关文章:

  • MITRE ATTCK 简介:初学者指南
  • 51单片机(二)中断系统与外部中断实验
  • 【FlutterDart】 listView.builder例子二(14 /100)
  • IDE和IDEA详解和具体差异
  • shell脚本总结2
  • 企业网络性能监控
  • 【系统集成项目管理工程师教程】第12章 执行过程组
  • 关于基于AGI和大模型技术下养老服务高质量发展解决方案项目,以及实现代码过程实战
  • OBOO鸥柏丨传媒广告行业的创新应用解决数字技术短板
  • 软件对象粒度控制与设计模式在其中作用的例子
  • ubuntu 22.04 server 格式化 磁盘 为 ext4 并 自动挂载 LTS
  • 计算网络信号
  • git 工具原理
  • PN结特性及反向饱和电流与反向漏电流详解
  • Halcon OCR 字体训练
  • DevOps业务价值流:需求设计最佳实践
  • 【命令操作】Linux三剑客之awk详解 _ 统信 _ 麒麟 _ 方德
  • C/C++」C++类型转换 之 dynamic_cast 操作符
  • C#枚举实战:定义、使用及高级特性解析
  • [ DOS 命令基础 2 ] DOS 命令详解-网络相关命令
  • Qt(openCV的应用)
  • liunx系统介绍
  • 蓝禾,汤臣倍健,三七互娱,得物,顺丰,快手,途游游戏25秋招内推
  • Linux云计算 |【第五阶段】CLOUD-DAY9
  • C#中的集合类及其使用
  • Kafka 之消息并发消费