go语言进阶之并发模式
并发模式
并发模式是指在程序设计中同时处理多个任务或进程的方式,以提高效率和响应性
for select循环模式
for select循环模式通常用于处理并发操作,尤其是在需要等待多个通道时。
select的执行过程主要是以下几步
- 阻塞等待,直到其中一个通道可用
- 执行case,当一个通道准备好了,select将会执行对应的case
- 随机选择,如果多个通道可用,go会随机选择一个case执行
- 循环执行,在for循环中,select可以持续运行,监听多个通道
比如:
package main
import (
"fmt"
"time")
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自ch1的消息"
}()
go func() {
time.Sleep(1 * time.Second)
ch2 <- "来自ch2的消息"
}()
for {
select {
case msg1 := <-ch1:
fmt.Println("接收到", msg1)
case msg2 := <-ch2:
fmt.Println("接收到", msg2)
case <-time.After(3 * time.Second):
fmt.Println("超时")
return
}
}
}
可以看到所有通道都可以输出。
select timeout模式
在go语言中使用数据库和网络请求时,一般都会设置查询超时,从而防止操作长时间挂起
package main
import (
"context"
"database/sql" "fmt" "log" "time"
_ "github.com/lib/pq" // PostgreSQL driver
)
func main() {
// 连接到数据库
db, err := sql.Open("postgres", "user=username dbname=mydb sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 设置查询超时为5秒
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // 确保在操作完成后调用cancel
// 执行查询
var result string
err = db.QueryRowContext(ctx, "SELECT name FROM users WHERE id = \$1", 1).Scan(&result)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Println("查询超时")
} else {
log.Fatal(err)
}
} else {
fmt.Println("查询结果:", result)
}
}
Pipeline模式(流水线模式)
流水线模式模拟的就是现实的流水线生产。主要就是通过一道道工序组装而成,每一道工序只负责自己的事情,这种模式就是流水线模式。
package main
import (
"fmt"
"sync"
)
// 生产者,生成数据
func producer(out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10; i++ {
out <- i
}
close(out)
}
// 处理阶段,处理数据
func worker(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for n := range in {
out <- n * 2 // 示例处理:将数字乘以 2
}
close(out)
}
// 消费者,接收处理后的数据
func consumer(in <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for n := range in {
fmt.Println(n) // 输出处理后的结果
}
}
func main() {
var wg sync.WaitGroup
// 创建用于连接各个阶段的 channel
pipeline1 := make(chan int)
pipeline2 := make(chan int)
wg.Add(1)
go producer(pipeline1, &wg)
wg.Add(1)
go worker(pipeline1, pipeline2, &wg)
wg.Add(1)
go consumer(pipeline2, &wg)
// 等待所有 goroutine 完成
wg.Wait()
}
这一段代码主要通过流水线模式来实现了0到9的数据乘2并打印,其中每个函数都是独立完成一个步骤并拼接起来的。
可以通过代码发现流水线模式的特点
- 在使用流水线模式时,每道工序都通过channel将数据传递到下一个工序
- 每一个工序一般都会对应一个函数
- 最终要有个main函数类似函数将这些工序串起来,这样就可以形成完整的数据流
扇出和扇入模式
扇入模式和扇出模式是由于流水线模式的运行速度不佳而进行改造的模式。
其中原理为增加流水线某个速度低下的步骤,让其同时运行多个步骤,再汇总到下一步骤中。
扇出(Fan-out)
扇出是指将一个输入流的数据分发到多个处理单元(goroutines)。这种模式可以用来提高处理能力,允许多个并发执行的 goroutine 同时处理数据。
package main
import (
"fmt"
"sync"
)
// 处理函数
func worker(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
}
}
func main() {
const numWorkers = 3
jobs := make(chan int, 10)
var wg sync.WaitGroup
// 启动多个 worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, &wg)
}
// 发送任务
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs) // 关闭 jobs channel 以结束 worker
wg.Wait() // 等待所有 worker 完成
}
扇入(Fan-in)
扇入是指将多个输入流的数据汇聚到一个处理单元。它可以用来合并多个 goroutine 的结果到一个 channel,通常在需要整合多个处理结果时使用。
package main
import (
"fmt"
"sync"
)
// 生成任务的函数
func generate(id int, jobs chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := 0; j < 3; j++ {
jobs <- j + id*3 // 生成不同的任务
}
}
// 执行任务的函数
func worker(jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Processing job %d\n", job)
}
}
func main() {
const numGenerators = 3
jobs := make(chan int, 10)
var wg sync.WaitGroup
// 启动生成器
for i := 0; i < numGenerators; i++ {
wg.Add(1)
go generate(i, jobs, &wg)
}
// 启动一个 worker 处理所有任务
wg.Add(1)
go worker(jobs, &wg)
// 等待所有生成器完成
wg.Wait()
close(jobs) // 关闭 jobs channel
// 等待 worker 完成
wg.Wait()
}
Future模式
Future模式是一个处理异步操作的编程模式,它与pipeline模式中工序必须要一个个运行不太一样,
Future模式允许在执行耗时操作是,不必等待操作完成,可以同时进行多个步骤,从而提高程序的效率和响应性。
其基本概念有:
- 异步执行:通过goroutines来异步执行任务
- 结果封装:使用通道来传递任务的结果
- 错误处理:同时处理异步操作可能出现的错误
总的来说,这个模式就是可以同时操作多个不同步骤,当所有操作结束时再进行返回。
package main
import (
"fmt"
"time"
)
type Future struct {
result interface{}
err error
}
func AsyncTask() Future {
ch := make(chan Future)
go func() {
time.Sleep(2 * time.Second)
ch <- Future{result: "Task completed", err: nil}
}()
return <-ch
}
func main() {
future := AsyncTask()
if future.err != nil {
fmt.Println("Error:", future.err)
} else {
fmt.Println(future.result)
}
}
其最大的特点就是返回结果,所以在未来获取这个结果的操作必须是一个阻塞的操作,要一直等待获取结果为止。