当前位置: 首页 > article >正文

go语言并发的最佳实践

Go 语言的并发模型是其最强大的特性之一,基于 CSP(Communicating Sequential Processes)理论,通过 goroutinechannel 实现轻量级并发.

一、并发核心概念

1. Goroutine

在 Go 语言中,Goroutine 是实现并发编程的核心特性之一,它本质上是一种轻量级线程。与传统的操作系统线程相比,Goroutine 有着显著的优势。

  • 轻量级线程,由 Go 运行时管理(非操作系统线程)。
  • 启动成本低(KB 级栈内存,动态扩缩容),可轻松创建数千个。

在 Go 语言中,通过 go 关键字可以非常方便地启动一个 Goroutine。以下是一个简单的示例:

   func main(){
      go func() {
       fmt.Println("Hello from goroutine!")
	   }()
	   
    // 这里需要注意,如果主函数直接结束,goroutine 可能来不及执行
    // 因为主函数结束会导致整个程序终止
    fmt.Println("Main function continues...")

   }

2. Channel

Channel 是 Go 语言中用于在不同 Goroutine 之间进行通信和同步的重要工具,它遵循 “通过通信共享内存” 的设计理念,避免了传统并发编程中使用共享内存带来的竞态问题。

  • 是一种类型安全的管道,它只能传输特定类型的数据。例如,一个 chan int 类型的 Channel 只能用于传输整数类型的数据
  • 避免共享内存的竞态问题,提倡“通过通信共享内存”。
  • 分两种类型:
    • 无缓冲通道(同步):发送和接收操作会阻塞,直到另一端准备好。
    • 有缓冲通道(异步):缓冲区未满或非空时不会阻塞。

二、原理:GMP 调度模型

1. GMP 调度模型

基本概念
在 Go 语言的并发体系中,GMP 调度模型是其核心机制,它主要由三个关键组件构成:

  • G(Goroutine):Goroutine 是 Go 语言中轻量级的执行单元,类似于传统操作系统中的线程,但它的开销要小得多。
  • M(Machine):Machine 代表操作系统线程,它是真正在操作系统内核层面上执行的线程。每个 M 都与一个底层的操作系统线程绑定,负责执行 Goroutine。M 是与操作系统交互的桥梁,它负责处理系统调用、线程上下文切换等底层操作。
  • P(Processor):Processor 是调度上下文,P 可以看作是 M 执行 Goroutine 的“许可证”,每个 M 要执行 Goroutine 必须先绑定一个 P。P 维护着一个本地的 Goroutine 队列,用于存储待执行的 Goroutine。同时,P 还负责调度和管理这些 Goroutine 的执行顺序。

工作流程

  • P 维护本地 Goroutine 队列:每个 P 都有一个自己的本地 Goroutine 队列,当创建一个新的 Goroutine 时,它会被放入某个 P 的本地队列中。这个队列是一个先进先出(FIFO)的队列,P 会按照队列中的顺序依次调度 Goroutine 执行。
  • M 绑定 P 后获取 G 执行:一个 M 要执行 Goroutine,必须先绑定一个空闲的 P。绑定成功后,M 会从 P 的本地队列中取出一个 Goroutine 并执行。当 M 执行完一个 Goroutine 后,会继续从 P 的队列中获取下一个 Goroutine,直到队列为空。
  • G 阻塞时 M 释放 P:当一个 Goroutine 在执行过程中发生阻塞(例如进行 IO 操作)时,M 会释放当前绑定的 P,以便其他 M 可以使用这个 P 继续执行其他 Goroutine。被释放的 P 会被放入全局 P 列表中,等待其他 M 来绑定。而阻塞的 Goroutine 会被挂起,当阻塞操作完成后,它会被重新放入某个 P 的本地队列或全局队列中,等待再次被调度执行。

2. 抢占式调度

Go 1.14+ 支持基于信号的抢占,避免长时间占用 CPU 的 Goroutine 导致调度延迟。

三、Goroutine 基础

1. 简单示例

下面是一个简单的 Goroutine 示例,展示了如何启动一个 Goroutine 并执行一个简单的任务:

package main

import (
    "fmt"
    "time"
)

func main() {
    go sayHello() // 启动 goroutine
	// 这里使用 time.Sleep 是为了让主函数等待一段时间,以便 goroutine 有机会执行
 	// 但这种方式并不是一个好的做法,实际开发中应该使用 sync.WaitGroup 进行同步
    time.Sleep(100 * time.Millisecond)
}

func sayHello() {
    fmt.Println("Hello!")
}

2. 使用 sync.WaitGroup 并发组

在实际开发中,为了确保所有的 Goroutine 都执行完毕后再继续执行后续的代码,我们可以使用 sync.WaitGroup 来进行同步。sync.WaitGroup 提供了三个主要的方法:AddDoneWait

  • Add 方法用于设置需要等待的 Goroutine 的数量。
  • Done 方法用于标记一个 Goroutine 已经执行完毕,相当于将等待的数量减 1。
  • Wait 方法用于阻塞当前的 Goroutine,直到所有标记的 Goroutine 都执行完毕。
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2) // 表示有两个任务需要等待

    go func() {
        defer wg.Done() // 在函数结束时调用 Done 方法,表示该任务完成
        fmt.Println("Goroutine 1")
    }()

    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 2")
    }()

    wg.Wait() // 阻塞直到所有任务完成
    fmt.Println("All done!")
}

四、Channel 操作

1.无缓冲通道

创建和使用无缓冲通道的基本步骤如下:

package main

import "fmt"

func main() {
    ch := make(chan int) // 创建一个无缓冲的整数类型通道
    go func() {
        num := 42
        ch <- num // 向通道发送数据
        fmt.Println("Data sent to channel")
    }()

    value := <-ch // 从通道接收数据
    fmt.Println("Received value:", value)
}

2. 有缓冲通道

有缓冲通道的使用可以提高程序的并发性能,避免不必要的阻塞。以下是一个有缓冲通道的示例:

package main

import "fmt"

func main() {
    ch := make(chan string, 2) // 创建一个缓冲区容量为 2 的字符串类型通道
    ch <- "A"
    ch <- "B"
    fmt.Println("Data sent to channel")

    fmt.Println(<-ch) // 从通道接收第一个数据
    fmt.Println(<-ch) // 从通道接收第二个数据
}

3. 关闭通道

在使用通道时,有时需要通知接收方不再有数据发送,这时可以使用 close 函数关闭通道。关闭通道后,接收方仍然可以从通道中接收数据,直到通道中的数据被全部接收完,之后再接收数据会得到该类型的零值。以下是一个关闭通道的示例:

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        for i := 0; i < 3; i++ {
            ch <- i
        }
        close(ch) // 关闭通道
        fmt.Println("Channel closed")
    }()

    for num := range ch { // 使用 for...range 循环从通道接收数据,直到通道关闭
        fmt.Println(num)
    }
    fmt.Println("All data received")
}

4. Select 多路复

select 语句用于在多个通道操作中进行选择,它类似于 switch 语句,但 select 语句专门用于处理通道操作。select 语句会随机选择一个可以执行的通道操作并执行,如果所有通道操作都无法执行,则会阻塞,直到有一个通道操作可以执行。如果指定了 default 分支,则在所有通道操作都无法执行时会执行 default 分支,不会阻塞。以下是一个使用 select 语句进行多路复用的示例:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(200 * time.Millisecond)
        ch1 <- "one"
    }()

    go func() {
        time.Sleep(100 * time.Millisecond)
        ch2 <- "two"
    }()

    select {
    case msg1 := <-ch1:
        fmt.Println(msg1)
    case msg2 := <-ch2:
        fmt.Println(msg2)
    case <-time.After(1 * time.Second): // 超时控制
        fmt.Println("timeout")
    }
}

五、同步原语

1. 互斥锁 sync.Mutex

在并发编程中,多个 Goroutine 同时访问和修改共享资源可能会导致数据不一致的问题,这时可以使用互斥锁 sync.Mutex 来保证同一时间只有一个 Goroutine 可以访问共享资源。互斥锁提供了两个主要的方法:LockUnlock

  • Lock 方法用于获取锁,如果锁已经被其他 Goroutine 持有,则当前 Goroutine 会阻塞,直到锁被释放。
  • Unlock 方法用于释放锁,允许其他 Goroutine 获取锁。

以下是一个使用互斥锁的示例:

package main

import (
    "fmt"
    "sync"
)

var counter int
var mu sync.Mutex

func increment() {
    mu.Lock()         // 获取锁
    defer mu.Unlock() // 确保在函数结束时释放锁
    counter++
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }
    wg.Wait() // 等待所有 Goroutine 执行完毕
    fmt.Println(counter) // 输出 1000
}

2. 读写锁 sync.RWMutex

读写锁 sync.RWMutex 是一种特殊的锁,它允许多个 Goroutine 同时进行读操作,但在进行写操作时会独占锁,不允许其他 Goroutine 进行读或写操作。读写锁适用于读多写少的场景,可以提高程序的并发性能。读写锁提供了四个主要的方法:RLockRUnlockLockUnlock

  • RLock 方法用于获取读锁,允许多个 Goroutine 同时持有读锁。
  • RUnlock 方法用于释放读锁。
  • Lock 方法用于获取写锁,在持有写锁期间,不允许其他 Goroutine 进行读或写操作。
  • Unlock 方法用于释放写锁。

以下是一个使用读写锁的示例:

package main

import (
    "fmt"
    "sync"
)

var cache = make(map[string]string)
var rwMu sync.RWMutex

func read(key string) string {
    rwMu.RLock()         // 获取读锁
    defer rwMu.RUnlock() // 确保在函数结束时释放读锁
    return cache[key]
}

func write(key, value string) {
    rwMu.Lock()          // 获取写锁
    defer rwMu.Unlock()  // 确保在函数结束时释放写锁
    cache[key] = value
}

func main() {
    var wg sync.WaitGroup

    // 启动多个读操作
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", index)
            value := read(key)
            fmt.Printf("Read key %s: %s\n", key, value)
        }(i)
    }

    // 启动一个写操作
    wg.Add(1)
    go func() {
        defer wg.Done()
        key := "key1"
        value := "value1"
        write(key, value)
        fmt.Printf("Written key %s: %s\n", key, value)
    }()

    wg.Wait() // 等待所有 Goroutine 执行完毕
}

六、并发模式

1. Worker Pool(工作池)

工作池模式是一种常见的并发模式,它可以帮助我们管理一组工作线程(在 Go 中就是 Goroutine),并将任务分配给这些工作线程进行处理。这种模式在处理大量独立任务时非常有用,可以避免创建过多的 Goroutine 导致系统资源耗尽。
收起


package main

import (
    "fmt"
)

// worker 函数表示一个工作线程,它从 jobs 通道接收任务,处理后将结果发送到 results 通道
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        // 模拟任务处理,这里简单地将任务值乘以 2
        results <- job * 2
    }
}

func main() {
    // 创建两个有缓冲的通道,分别用于存储任务和结果
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动 3 个工作线程
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送 9 个任务到 jobs 通道
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    // 关闭 jobs 通道,表示不再有新的任务发送
    close(jobs)

    // 收集结果
    for a := 1; a <= 9; a++ {
        <-results
    }
    // 关闭 results 通道
    close(results)
}

2. Fan-out/Fan-in(扇出/扇入)

扇出 / 扇入模式是一种用于并发处理数据的模式。扇出指的是将一个输入源的数据分发到多个 Goroutine 中进行处理,扇入则是将多个 Goroutine 的处理结果合并到一个通道中。

package main

import (
	"fmt"
	"sync"
)

// merge 函数用于将多个输入通道的数据合并到一个输出通道中
func merge(chs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// 从每个输入通道读取数据
	for _, ch := range chs {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for n := range c {
				out <- n
			}
		}(ch)
	}

	// 关闭输出通道
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	// 创建 3 个输入通道
	ch1 := make(chan int)
	ch2 := make(chan int)
	ch3 := make(chan int)

	// 启动 3 个 Goroutine 向输入通道发送数据
	go func() {
		ch1 <- 1
		ch1 <- 2
		close(ch1)
	}()
	go func() {
		ch2 <- 3
		ch2 <- 4
		close(ch2)
	}()
	go func() {
		ch3 <- 5
		ch3 <- 6
		close(ch3)
	}()

	// 合并 3 个输入通道的数据到一个输出通道
	resultCh := merge(ch1, ch2, ch3)

	// 从输出通道读取数据
	for num := range resultCh {
		fmt.Println(num)
	}
}

3. 管道模式(Pipeline)

管道模式是 Go 语言并发编程中一种非常强大且实用的模式,它通过串联多个 Goroutine 来处理数据流,形成一个处理链。这种模式特别适合那些需要分阶段处理任务的场景,例如数据清洗、数据转换、数据筛选等。在管道模式中,每个阶段的 Goroutine 负责完成一个特定的任务,将处理后的数据传递给下一个阶段,就像工厂里的流水线一样,每个工人负责一道工序,最终完成整个产品的生产。

package main

import "fmt"

// generate 函数用于生成一个整数数据流
// 它接收多个整数作为输入,将这些整数依次发送到一个通道中
// 发送完成后关闭通道
func generate(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// square 函数用于对输入通道中的每个整数进行平方操作
// 它从输入通道中接收整数,计算其平方值,并将结果发送到一个新的通道中
// 当输入通道关闭且所有数据都处理完后,关闭输出通道
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

// filterEven 函数用于过滤掉输入通道中的奇数,只保留偶数
// 它从输入通道中接收整数,判断是否为偶数,如果是则发送到输出通道
// 当输入通道关闭且所有数据都处理完后,关闭输出通道
func filterEven(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			if n%2 == 0 {
				out <- n
			}
		}
		close(out)
	}()
	return out
}

func main() {
	// 定义一个整数切片作为初始数据
	nums := []int{1, 2, 3, 4}
	// 管道串联:生成 → 平方 → 过滤 → 输出
	// 首先调用 generate 函数生成整数数据流
	// 然后将生成的数据流传递给 square 函数进行平方操作
	// 接着将平方后的数据流传递给 filterEven 函数进行偶数过滤
	// 最后使用 for...range 循环从最终的通道中接收数据并输出
	for n := range filterEven(square(generate(nums...))) {
		fmt.Println(n) // 输出结果:4, 16
	}
}

管道模式的优点 :

  • 可维护性高:每个阶段的任务都封装在独立的函数中,代码结构清晰,易于理解和维护。
  • 可扩展性强:可以方便地添加、删除或修改处理阶段,以适应不同的业务需求。
  • 并发性能好:每个阶段的处理可以并行进行,充分利用多核 CPU 的资源,提高程序的处理效率。

通过管道模式,我们可以将复杂的任务分解为多个简单的子任务,并通过通道将这些子任务连接起来,实现高效、灵活的数据流处理。

4. 使用 Context 控制生命周期

在 Go 中,context 包提供了一种机制来控制 Goroutine 的生命周期,例如取消任务、设置超时等。

package main

import (
	"context"
	"fmt"
	"time"
)

// longRunningTask 函数表示一个长时间运行的任务,它会监听 context 的取消信号
func longRunningTask(ctx context.Context) {
	for {
		select {
		case <-ctx.Done(): // 监听取消信号
			fmt.Println("Task canceled:", ctx.Err())
			return
		default:
			// 模拟工作
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	// 创建一个带有超时的 context,超时时间为 3 秒
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	// 启动一个 Goroutine 执行长时间运行的任务
	go longRunningTask(ctx)

	// 等待 4 秒,确保任务会被取消
	time.Sleep(4 * time.Second)
}

七、常见问题与调试

1. 竞态条件(Race Condition)

竞态条件是指多个 Goroutine 同时访问和修改共享资源,导致程序的行为变得不可预测。可以使用 Go 语言提供的 -race 标志来检测竞态条件。

package main

import (
	"fmt"
	"sync"
)

var counter int
var wg sync.WaitGroup

func increment() {
	counter++  //不用互斥锁
}

func main() {
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			increment()
		}()
	}
	wg.Wait()
	fmt.Println(counter)
}

要检测上述代码中的竞态条件,可以使用以下命令:

go run -race main.go

如果存在竞态条件,Go 编译器会输出详细的错误信息,帮助我们定位问题。

2. 死锁

死锁是指程序中的 Goroutine 相互等待对方释放资源,导致所有 Goroutine 都无法继续执行的情况。要避免死锁,需要确保通道操作成对出现,避免永久阻塞。

func main() {
	ch := make(chan int)
	ch <- 1 // 这里会导致死锁,因为没有接收方
	<-ch
}
//fatal error: all goroutines are asleep - deadlock!

在上面代码中,由于没有接收方,ch <- 1 会导致永久阻塞,从而引发死锁。要避免这种情况,需要确保在发送数据之前有接收方准备好接收数据。

3.在 Goroutine 中传递错误

func doSomething() error {
    return errors.New("something went wrong")
}

func main() {
    errCh := make(chan error)
    go func() {
        errCh <- doSomething()
    }()

    if err := <-errCh; err != nil {
        fmt.Println("Error:", err)
    }
}

4. Goroutine 泄漏

Goroutine 泄漏是指 Goroutine 由于某种原因无法正常退出,导致系统资源不断被占用。要避免 Goroutine 泄漏,始终确保通道被关闭,或使用 context 控制退出。

检测方法

使用 runtime.NumGoroutine() 监控 goroutine 数量

runtime.NumGoroutine() 函数可以返回当前程序中正在运行的 Goroutine 的数量。通过定期检查这个数量,我们可以判断是否存在 Goroutine 泄漏。如果 Goroutine 的数量持续增加,而没有相应的减少,那么很可能存在泄漏。

以下是一个示例代码,展示了如何使用 runtime.NumGoroutine() 监控 Goroutine 数量:

package main

import (
	"fmt"
	"runtime"
	"time"
)

func worker(ch chan int) {
	for {
		select {
		case num := <-ch:
			fmt.Println("Received:", num)
		default:
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func main() {
	ch := make(chan int)
	go worker(ch)

	// 定期检查 Goroutine 数量
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for range ticker.C {
		numGoroutines := runtime.NumGoroutine()
		fmt.Printf("Number of goroutines: %d\n", numGoroutines)
	}
}

使用Go 的 pprof 分析 goroutine 堆栈。

pprof 是 Go 语言自带的一个性能分析工具,它可以帮助我们分析程序的 CPU 使用情况、内存分配情况以及 Goroutine 堆栈信息。通过 pprof,我们可以查看每个 Goroutine 的状态和调用栈,从而找出可能存在泄漏的 Goroutine。

以下是一个使用 pprof 分析 Goroutine 堆栈的示例代码:

package main

import (
	"net/http"
	_ "net/http/pprof"
	"time"
)

func worker() {
	for {
		time.Sleep(1 * time.Second)
	}
}

func main() {
	go worker()

	// 启动 pprof 服务
	go func() {
		http.ListenAndServe("localhost:6060", nil)
	}()

	// 让程序一直运行
	select {}
}

代码执行后,使用以下命令:

go tool pprof http://localhost:6060/debug/pprof/goroutine

上述命令后,会进入 pprof 的交互式界面。在这个界面中,可以使用各种命令来查看 Goroutine 的堆栈信息,例如 top 命令可以查看占用资源最多的 Goroutine,list 命令可以查看指定函数的调用栈等。通过分析这些信息,我们可以找出可能存在泄漏的 Goroutine,并对代码进行修复。

八、最佳实践

1.优先使用 channel 而非共享内存

在 Go 语言中,推荐使用 channel 来进行 Goroutine 之间的通信和同步,而不是直接使用共享内存。channel 可以避免竞态条件和死锁等问题,使代码更加简洁和安全。

2.避免在 goroutine 中使用全局变量

全局变量在多个 Goroutine 中共享时容易引发竞态条件。尽量将变量的作用域限制在需要使用的 Goroutine 内部,或者使用 channel 来传递数据。

3.使用 select 实现超时和取消

select 语句可以用于同时监听多个通道的操作,结合 time.Aftercontext 可以实现超时和取消功能,提高程序的健壮性。

4.小任务用 goroutine,大任务考虑线程池

对于小的、独立的任务,可以直接使用 Goroutine 来执行,因为 Goroutine 的创建和销毁成本较低。对于大的、复杂的任务,可以考虑使用工作池模式来管理一组 Goroutine,避免创建过多的 Goroutine 导致系统资源耗尽。
通过掌握这些概念和模式,你可以高效地编写并发 Go 程序,充分利用多核 CPU 资源。同时,遵循最佳实践和注意常见问题的处理,可以使你的代码更加健壮和可靠。


http://www.kler.cn/a/551154.html

相关文章:

  • 2025最新Java面试题大全(整理版)2000+ 面试题附答案详解
  • Python实战进阶 No1: RESTful API - 基于Flask的实例说明
  • Golang GORM系列:GORM分页和排序
  • SpringBoot分布式应用程序和数据库在物理位置分配上、路由上和数量上的最佳实践是什么?
  • Spring Boot最新技术特性深度解析与实战应用
  • SpringBoot 核心总结图
  • HarmonyNext上传用户相册图片到服务器
  • Spreadjs与GcExcel
  • 迅为RK3568开发板篇Openharmony配置HDF控制UART-什么是串口
  • Docker+DockerCompose+Harbor安装
  • DeepSeek R1本地部署 DeepSeek Api接口调用 java go版本
  • DFS算法篇:理解递归,熟悉递归,成为递归
  • 腿足机器人之二- 运动控制概览
  • SSH 登录到 Linux 服务器为什么没有要求输入密码
  • 详解Redis在Centos上的安装
  • MySQL索引和其底层数据结构介绍
  • 国产编辑器EverEdit - 如虎添翼的功能:快速选择
  • AutoGPT:突破性人工智能工具,赋能自动化写作与任务执行的未来
  • 用于可靠工业通信的5G-TSN集成原型:基于帧复制与消除可靠性的研究
  • 基于Springboot+Vue前后端分离的农场投入品运营线上管理系统设计与实现+万字文档+指导搭建视频