当前位置: 首页 > article >正文

golang并发编程—— 并发模式

并发模式

并发模式是指在并发编程中常用的设计模式和方法,用于有效地管理和协调多个并发任务。以下是一些常见的并发模式,结合 Go 语言的示例代码来介绍它们的应用。

1. 工作池(Worker Pool)

工作池模式通过一组固定数量的工作 Goroutine 来处理大量的任务,避免因为过多的 Goroutine 而导致资源的过度消耗。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Worker 函数,模拟处理任务
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, j)
        time.Sleep(time.Second) // 模拟工作
        fmt.Printf("Worker %d finished job %d\n", id, j)
        results <- j * 2 // 返回结果
    }
}

func main() {
    const numJobs = 5
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    var wg sync.WaitGroup

    // 启动 worker Goroutine
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 发送任务到 jobs 通道
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    // 等待所有 worker 完成
    wg.Wait()
    close(results)

    // 打印结果
    for result := range results {
        fmt.Println("Result:", result)
    }
}

删除close(results)语句会出现fatal error: all goroutines are asleep - deadlock!错误

2. 扇入(Fan-in)

扇入模式将多个输入通道合并到一个通道,从而可以在单个 Goroutine 中处理来自多个来源的数据。

package main

import (
    "fmt"
    "time"
)

// generator 生成一系列数据并发送到通道
func generator(start int, end int, c chan<- int) {
    for i := start; i <= end; i++ {
        c <- i
        time.Sleep(time.Millisecond * 500) // 模拟延迟
    }
    close(c)
}

// fanIn 将多个通道合并为一个通道
func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }

    for _, c := range channels {
        wg.Add(1)
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    
    go generator(1, 5, c1)
    go generator(6, 10, c2)

    for n := range fanIn(c1, c2) {
        fmt.Println(n)
    }
}
  • 如何将close语句移到Wait前面,则提前关闭通道,没有数据。

  • 如果删除close语句,会出现all goroutines are asleep的错误。

    fatal error: all goroutines are asleep - deadlock!
    

3. 扇出(Fan-out)

扇出模式将一个输入通道的数据分发到多个 Goroutine 进行并行处理。

package main

import (
    "fmt"
    "sync"
    "time"
)

// worker 函数处理来自 jobs 通道的任务
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, j)
        time.Sleep(time.Second) // 模拟工作
    }
}

func main() {
    jobs := make(chan int, 10)
    var wg sync.WaitGroup

    // 启动 worker Goroutine
    for w := 1; w <= 3; w++ {
        wg.Add(1)
        go worker(w, jobs, &wg)
    }

    // 发送任务到 jobs 通道
    for j := 1; j <= 10; j++ {
        jobs <- j
    }
    close(jobs)

    // 等待所有 worker 完成
    wg.Wait()
}

如果调换wait与close的顺序或者删除close语句,会出现all goroutines are asleep的错误

fatal error: all goroutines are asleep - deadlock!

4. 管道(Pipeline)

管道模式将任务分解为一系列的处理步骤,每个步骤由一个 Goroutine 处理,并将结果传递到下一个步骤的 Goroutine。

package main

import (
    "fmt"
    "time"
)

// 生成器生成一系列数字
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段1:数字加倍
func doubler(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

// 阶段2:数字平方
func squarer(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 生成数据并通过管道传递
    nums := generator(1, 2, 3, 4, 5)
    doubled := doubler(nums)
    squared := squarer(doubled)

    // 打印结果
    for n := range squared {
        fmt.Println(n)
    }
}

5. 多路复用(Multiplexing)

多路复用模式通过 select 语句从多个通道中选择可用的通道进行处理。

package main

import (
    "fmt"
    "time"
)

// 生成器函数生成数据并发送到通道
func generator(name string, interval time.Duration) <-chan string {
    c := make(chan string)
    go func() {
        for i := 1; ; i++ {
            c <- fmt.Sprintf("%s: %d", name, i)
            time.Sleep(interval)
        }
    }()
    return c
}

func main() {
    c1 := generator("Channel 1", 2*time.Second)
    c2 := generator("Channel 2", 3*time.Second)

    for i := 0; i < 5; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println(msg1)
        case msg2 := <-c2:
            fmt.Println(msg2)
        }
    }
}

6. 生产者-消费者(Producer-Consumer)

生产者-消费者模式通过缓冲通道协调生产者和消费者的工作,确保数据的安全传递和处理。

以下代码有错误

package main

import (
    "fmt"
    "sync"
    "time"
)

// 生产者函数生成数据并发送到通道
func producer(id int, jobs chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := 1; j <= 5; j++ {
        fmt.Printf("Producer %d producing job %d\n", id, j)
        jobs <- j
        time.Sleep(time.Millisecond * 500)
    }
}

// 消费者函数从通道接收数据并处理
func consumer(id int, jobs <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Consumer %d processing job %d\n", id, j)
        time.Sleep(time.Second)
    }
}

func main() {
    jobs := make(chan int, 10)
    var wg sync.WaitGroup

    // 启动生产者 Goroutine
    for p := 1; p <= 2; p++ {
        wg.Add(1)
        go producer(p, jobs, &wg)
    }

    // 启动消费者 Goroutine
    for c := 1; c <= 2; c++ {
        wg.Add(1)
        go consumer(c, jobs, &wg)
    }

    // 等待所有生产者完成
    wg.Wait()
    close(jobs)
}

fatal error: all goroutines are asleep - deadlock

多生产者,多消费者正确代码如下

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Product struct {
	name  int
	value int
}

func producer(wg *sync.WaitGroup, products chan<- Product, name int, stop *bool) {
	for !*stop {
		product := Product{name: name, value: rand.Intn(100)}
		products <- product
		fmt.Printf("producer %v produce a product: %#v\n", name, product)
		time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)
	}
	wg.Done()
}

func consumer(wg *sync.WaitGroup, products <-chan Product, name int) {
	for product := range products {
		fmt.Printf("consumer %v consume a product: %#v\n", name, product)
		time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)
	}
	wg.Done()
}

func main() {
	var wgp sync.WaitGroup
	var wgc sync.WaitGroup
	stop := false
	products := make(chan Product, 10)

	// 创建 5 个生产者和 5 个消费者
	for i := 0; i < 5; i++ {
		go producer(&wgp, products, i, &stop)
		go consumer(&wgc, products, i)
		wgp.Add(1)
		wgc.Add(1)
	}

	time.Sleep(time.Duration(1) * time.Second)
	stop = true     // 设置生产者终止信号
	wgp.Wait()      // 等待生产者退出
	close(products) // 关闭通道
	wgc.Wait()      // 等待消费者退出
}

参考资料

golang如何优雅地关闭通道
并发模式
Go Channel 详解
总结了才知道,原来channel有这么多用法
生产者消费者模式 (Producer-Consumer Pattern)

补充资料

Channel 语法

ch <- x     // sends (or writes ) x through channel ch
x = <-ch   // x receives (or reads) data sent to the channel ch
<-ch      // receives data, but the result is discarded

分别是:

  • 通过通道ch发送(或写入)x
  • x接收(或读取)发送到通道ch的数据
  • 接收数据,但结果被丢弃

WaitGroup介绍

WatiGroup是sync包中的一个struct类型,用来收集需要等待执行完成的goroutine。下面是它的定义:

// WaitGroup用于等待一组线程的结束。
// 父线程调用Add方法来设定应等待的线程的数量。
// 每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。
type WaitGroup struct {
    // 包含隐藏或非导出字段
}

// Add方法向内部计数加上delta,delta可以是负数;
// 如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。
// 注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。
// 一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。
func (wg *WaitGroup) Add(delta int)

// Done方法减少WaitGroup计数器的值,应在线程的最后执行。
func (wg *WaitGroup) Done()

// Wait方法阻塞直到WaitGroup计数器减为0。
func (wg *WaitGroup) Wait()

http://www.kler.cn/a/287231.html

相关文章:

  • 【神经网络基础】
  • 【25】Word:林涵-科普文章❗
  • WPS数据分析000004
  • 基于Python的心电图报告解析与心电吸引子绘制
  • Spark常见面试题-部分待更新
  • springboot基于微信小程序的传统美食文化宣传平台小程序
  • 收纳程序 源码
  • 小程序中使用page-container来做弹窗
  • 数据库表的分类
  • Redis与SpringMVC的整合与最佳实践
  • LDR6023:革新手机转接器体验,快充与OTG并存的科技杰作
  • 【mysql】03通过命令行快速导出带字段名的csv格式数据
  • QT Quick QML 添加海康威视SDK云台控制模块
  • java操作日期时间类
  • v-bind,v-on与简写:和@有什么区别?
  • [Linux网络]TCP三次握手和四次挥手的连接建立和断开
  • win10环境下gvim离线配置插件的一些补充
  • 8.22
  • javascript指什么
  • blender4.2中安装插件的方式
  • 国密起步5:GmSSL3交叉编译arm64
  • Qt/QML学习-Dialog
  • 深入解析Go语言os/user包:用户和组管理实战指南
  • Apache Arrow简介
  • vscode Git代码版本回退
  • 【 html+css 绚丽Loading 】 000031 三元轮回盘