当前位置: 首页 > article >正文

go语言进阶之并发模式

并发模式

并发模式是指在程序设计中同时处理多个任务或进程的方式,以提高效率和响应性

for select循环模式

for select循环模式通常用于处理并发操作,尤其是在需要等待多个通道时。

select的执行过程主要是以下几步

  1. 阻塞等待,直到其中一个通道可用
  2. 执行case,当一个通道准备好了,select将会执行对应的case
  3. 随机选择,如果多个通道可用,go会随机选择一个case执行
  4. 循环执行,在for循环中,select可以持续运行,监听多个通道

比如:

package main  
  
import (  
    "fmt"  
    "time")  
  
func main() {  
    ch1 := make(chan string)  
    ch2 := make(chan string)  
  
    go func() {  
       time.Sleep(1 * time.Second)  
       ch1 <- "来自ch1的消息"  
    }()  
    go func() {  
       time.Sleep(1 * time.Second)  
       ch2 <- "来自ch2的消息"  
    }()  
    for {  
       select {  
       case msg1 := <-ch1:  
          fmt.Println("接收到", msg1)  
       case msg2 := <-ch2:  
          fmt.Println("接收到", msg2)  
       case <-time.After(3 * time.Second):  
          fmt.Println("超时")  
          return  
       }  
  
    }  
}

可以看到所有通道都可以输出。

select timeout模式

在go语言中使用数据库和网络请求时,一般都会设置查询超时,从而防止操作长时间挂起

package main  
  
import (  
    "context"  
    "database/sql"    "fmt"    "log"    "time"  
    _ "github.com/lib/pq" // PostgreSQL driver  
)  
  
func main() {  
    // 连接到数据库  
    db, err := sql.Open("postgres", "user=username dbname=mydb sslmode=disable")  
    if err != nil {  
       log.Fatal(err)  
    }  
    defer db.Close()  
  
    // 设置查询超时为5秒  
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)  
    defer cancel() // 确保在操作完成后调用cancel  
  
    // 执行查询  
    var result string  
    err = db.QueryRowContext(ctx, "SELECT name FROM users WHERE id = \$1", 1).Scan(&result)  
    if err != nil {  
       if err == context.DeadlineExceeded {  
          fmt.Println("查询超时")  
       } else {  
          log.Fatal(err)  
       }  
    } else {  
       fmt.Println("查询结果:", result)  
    }  
}

Pipeline模式(流水线模式)

流水线模式模拟的就是现实的流水线生产。主要就是通过一道道工序组装而成,每一道工序只负责自己的事情,这种模式就是流水线模式。

package main

import (
	"fmt"
	"sync"
)

// 生产者,生成数据
func producer(out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 10; i++ {
		out <- i
	}
	close(out)
}

// 处理阶段,处理数据
func worker(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for n := range in {
		out <- n * 2 // 示例处理:将数字乘以 2
	}
	close(out)
}

// 消费者,接收处理后的数据
func consumer(in <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for n := range in {
		fmt.Println(n) // 输出处理后的结果
	}
}

func main() {
	var wg sync.WaitGroup

	// 创建用于连接各个阶段的 channel
	pipeline1 := make(chan int)
	pipeline2 := make(chan int)

	wg.Add(1)
	go producer(pipeline1, &wg)

	wg.Add(1)
	go worker(pipeline1, pipeline2, &wg)

	wg.Add(1)
	go consumer(pipeline2, &wg)

	// 等待所有 goroutine 完成
	wg.Wait()
}

这一段代码主要通过流水线模式来实现了0到9的数据乘2并打印,其中每个函数都是独立完成一个步骤并拼接起来的。

可以通过代码发现流水线模式的特点

  1. 在使用流水线模式时,每道工序都通过channel将数据传递到下一个工序
  2. 每一个工序一般都会对应一个函数
  3. 最终要有个main函数类似函数将这些工序串起来,这样就可以形成完整的数据流

扇出和扇入模式

扇入模式和扇出模式是由于流水线模式的运行速度不佳而进行改造的模式。

其中原理为增加流水线某个速度低下的步骤,让其同时运行多个步骤,再汇总到下一步骤中。

扇出(Fan-out)

扇出是指将一个输入流的数据分发到多个处理单元(goroutines)。这种模式可以用来提高处理能力,允许多个并发执行的 goroutine 同时处理数据。

package main

import (
	"fmt"
	"sync"
)

// 处理函数
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Worker %d processing job %d\n", id, job)
	}
}

func main() {
	const numWorkers = 3
	jobs := make(chan int, 10)
	var wg sync.WaitGroup

	// 启动多个 worker
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, &wg)
	}

	// 发送任务
	for j := 1; j <= 10; j++ {
		jobs <- j
	}
	close(jobs) // 关闭 jobs channel 以结束 worker

	wg.Wait() // 等待所有 worker 完成
}

扇入(Fan-in)

扇入是指将多个输入流的数据汇聚到一个处理单元。它可以用来合并多个 goroutine 的结果到一个 channel,通常在需要整合多个处理结果时使用。

package main

import (
	"fmt"
	"sync"
)

// 生成任务的函数
func generate(id int, jobs chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for j := 0; j < 3; j++ {
		jobs <- j + id*3 // 生成不同的任务
	}
}

// 执行任务的函数
func worker(jobs <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Processing job %d\n", job)
	}
}

func main() {
	const numGenerators = 3
	jobs := make(chan int, 10)
	var wg sync.WaitGroup

	// 启动生成器
	for i := 0; i < numGenerators; i++ {
		wg.Add(1)
		go generate(i, jobs, &wg)
	}

	// 启动一个 worker 处理所有任务
	wg.Add(1)
	go worker(jobs, &wg)

	// 等待所有生成器完成
	wg.Wait()
	close(jobs) // 关闭 jobs channel

	// 等待 worker 完成
	wg.Wait()
}

Future模式

Future模式是一个处理异步操作的编程模式,它与pipeline模式中工序必须要一个个运行不太一样,

Future模式允许在执行耗时操作是,不必等待操作完成,可以同时进行多个步骤,从而提高程序的效率和响应性。

其基本概念有:

  1. 异步执行:通过goroutines来异步执行任务
  2. 结果封装:使用通道来传递任务的结果
  3. 错误处理:同时处理异步操作可能出现的错误

总的来说,这个模式就是可以同时操作多个不同步骤,当所有操作结束时再进行返回。

package main  
  
import (  
    "fmt"  
    "time"
    )  
  
type Future struct {  
    result interface{}  
    err    error  
}  
  
func AsyncTask() Future {  
    ch := make(chan Future)  
  
    go func() {  
       time.Sleep(2 * time.Second)  
       ch <- Future{result: "Task completed", err: nil}  
    }()  
    return <-ch  
}  
  
func main() {  
    future := AsyncTask()  
    if future.err != nil {  
       fmt.Println("Error:", future.err)  
    } else {  
       fmt.Println(future.result)  
    }  
}

其最大的特点就是返回结果,所以在未来获取这个结果的操作必须是一个阻塞的操作,要一直等待获取结果为止。


http://www.kler.cn/a/391451.html

相关文章:

  • C语言:-三子棋游戏代码:分支-循环-数组-函数集合
  • 基于光偏振与光学调制实现白光干涉相移
  • Java 面试题 - ArrayList 和 LinkedList 的区别,哪个集合是线程安全的?
  • 浅谈云计算07 | 云安全机制
  • SpringBoot + Websocket实现系统用户消息通知
  • Oracle EBS GL定期盘存WIP日记账无法过账数据修复
  • 产品经理如何优化项目管理流程
  • 哇喔!20种单例模式的实现与变异总结
  • 【LeetCode】【算法】55. 跳跃游戏
  • PyQt入门指南五十四 依赖管理与打包发布
  • 基于标签相关性的多标签学习
  • Ubuntu24.04安装搜狗输入法详细教程
  • Python的Web请求:requests库入门与应用
  • uniapp h5实现录音
  • 鸿蒙与团结引擎c#与ts简单交互
  • 【Linux】基础IO及文件描述符相关内容详细梳理
  • 深入剖析 Web HTTP 请求:从浏览器到服务器的完整流程
  • python:用 sklearn 构建 K-Means 聚类模型
  • 【Vue3】知识汇总,附详细定义和源码详解,后续出微信小程序项目(4)
  • Python爬虫:国家代码(ISO 3166-1)国家货币代码(ISO 4217)
  • 前端学习八股资料CSS(二)
  • requests库如何处理 - POST请求常见的两种请求体格式:表单格式JSON格式
  • 【H3C华三 】VRRP与BFD、Track联动配置案例
  • WebRTC视频 01 - 视频采集整体架构
  • 【C++课程学习】:string的模拟实现
  • 两化融合评估流程