使用Golang实现开发中常用的【并发设计模式】
使用Golang实现开发中常用的【并发设计模式】
设计模式是解决常见问题的模板,可以帮助我们提升思维能力,编写更高效、可维护性更强的代码
屏障模式
未来模式
管道模式
协程池模式
发布订阅模式
下面是使用 Go 语言实现屏障模式、未来模式、管道模式、协程池模式和发布订阅模式的示例代码。
1. 屏障模式
屏障模式(Barrier Pattern)用于同步多个 goroutine,在所有 goroutine 都到达某个点之前,任何 goroutine 都不能继续执行。
package main
import (
"fmt"
"sync"
)
func main() {
const numWorkers = 5
var wg sync.WaitGroup
wg.Add(numWorkers)
// 创建一个 Barrier
barrier := make(chan struct{}, numWorkers)
for i := 0; i < numWorkers; i++ {
go func(id int) {
defer wg.Done()
// 模拟工作
fmt.Printf("Worker %d is working...\n", id)
// 到达屏障点
barrier <- struct{}{}
// 等待所有 goroutine 到达屏障点
<-barrier
// 继续工作
fmt.Printf("Worker %d continues after barrier.\n", id)
}(i)
}
wg.Wait()
close(barrier)
}
2. 未来模式
未来模式(Future Pattern)用于异步获取结果,通常用于长时间运行的任务。
package main
import (
"fmt"
"time"
)
type Future interface {
Get() (string, error)
}
type futureImpl struct {
resultChan chan string
errChan chan error
}
func NewFuture() Future {
f := &futureImpl{
resultChan: make(chan string, 1),
errChan: make(chan error, 1),
}
go f.run()
return f
}
func (f *futureImpl) run() {
// 模拟长时间运行的任务
time.Sleep(2 * time.Second)
select {
case f.resultChan <- "Result":
case f.errChan <- nil:
}
}
func (f *futureImpl) Get() (string, error) {
select {
case result := <-f.resultChan:
return result, nil
case err := <-f.errChan:
return "", err
}
}
func main() {
future := NewFuture()
fmt.Println("Doing other work...")
result, err := future.Get()
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Result:", result)
}
}
3. 管道模式
管道模式(Pipeline Pattern)用于将多个 goroutine 串联起来,形成一个数据处理流水线。
package main
import (
"fmt"
"time"
)
func produce(in chan<- int) {
for i := 0; i < 10; i++ {
in <- i
time.Sleep(100 * time.Millisecond)
}
close(in)
}
func process(out <-chan int, in chan<- int) {
for v := range out {
v *= 2
in <- v
}
close(in)
}
func consume(in <-chan int) {
for v := range in {
fmt.Println("Consumed:", v)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go produce(ch1)
go process(ch1, ch2)
go consume(ch2)
time.Sleep(2 * time.Second)
}
4. 协程池模式
协程池模式(Coroutine Pool Pattern)用于管理和重用一组固定数量的 goroutine。
package main
import (
"fmt"
"sync"
"time"
)
type Task func(int)
type GoroutinePool struct {
size int
tasks chan Task
wg sync.WaitGroup
stop bool
stopCh chan struct{}
doneCh chan struct{}
}
func NewGoroutinePool(size int) *GoroutinePool {
return &GoroutinePool{
size: size,
tasks: make(chan Task),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}
func (p *GoroutinePool) Start() {
for i := 0; i < p.size; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
func (p *GoroutinePool) worker(id int) {
defer p.wg.Done()
for {
select {
case task := <-p.tasks:
task(id)
case <-p.stopCh:
p.doneCh <- struct{}{}
return
}
}
}
func (p *GoroutinePool) Submit(task Task) {
p.tasks <- task
}
func (p *GoroutinePool) Stop() {
p.stop = true
close(p.stopCh)
p.wg.Wait()
close(p.doneCh)
}
func main() {
pool := NewGoroutinePool(5)
pool.Start()
for i := 0; i < 20; i++ {
task := func(id int) {
fmt.Printf("Task %d handled by worker %d\n", i, id)
time.Sleep(100 * time.Millisecond)
}
pool.Submit(task)
}
time.Sleep(1 * time.Second)
pool.Stop()
}
5. 发布订阅模式
发布订阅模式(Publish-Subscribe Pattern)用于解耦消息的发送者和接收者。
package main
import (
"fmt"
"sync"
)
type Event string
type Subscriber func(Event)
type Publisher struct {
mu sync.Mutex
subscribers map[Subscriber]struct{}
}
func NewPublisher() *Publisher {
return &Publisher{
subscribers: make(map[Subscriber]struct{}),
}
}
func (p *Publisher) Subscribe(subscriber Subscriber) {
p.mu.Lock()
defer p.mu.Unlock()
p.subscribers[subscriber] = struct{}{}
}
func (p *Publisher) Unsubscribe(subscriber Subscriber) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.subscribers, subscriber)
}
func (p *Publisher) Publish(event Event) {
p.mu.Lock()
defer p.mu.Unlock()
for sub := range p.subscribers {
go sub(event)
}
}
func main() {
pub := NewPublisher()
subscriber1 := func(event Event) {
fmt.Printf("Subscriber 1 received: %s\n", event)
}
subscriber2 := func(event Event) {
fmt.Printf("Subscriber 2 received: %s\n", event)
}
pub.Subscribe(subscriber1)
pub.Subscribe(subscriber2)
pub.Publish("Event 1")
pub.Publish("Event 2")
pub.Unsubscribe(subscriber1)
pub.Publish("Event 3")
}
总结
- 屏障模式:使用通道和
sync.WaitGroup
同步多个 goroutine。 - 未来模式:使用通道异步获取任务结果。
- 管道模式:使用通道连接多个 goroutine 形成数据处理流水线。
- 协程池模式:管理一组固定数量的 goroutine,重用这些 goroutine 处理任务。
- 发布订阅模式:解耦消息的发送者和接收者,使用通道广播消息。