40分钟学 Go 语言高并发:sync包详解(下)
sync包详解(下)
学习目标
知识点 | 掌握程度 | 应用场景 |
---|---|---|
WaitGroup使用 | 熟练使用和理解原理 | 并发任务的同步等待 |
Once实现原理 | 理解底层实现和使用场景 | 单例模式、一次性初始化 |
Pool性能优化 | 掌握对象池的使用和调优 | 高并发下的内存优化 |
Cond应用场景 | 了解条件变量的使用方法 | 多线程协调和通信 |
1. WaitGroup的使用
1.1 WaitGroup基本原理
WaitGroup用于等待一组goroutine完成。它提供三个方法:
- Add(): 增加等待的goroutine数量
- Done(): 标记一个goroutine完成
- Wait(): 等待所有goroutine完成
让我们通过一个完整的例子来理解WaitGroup的使用:
package main
import (
"fmt"
"sync"
"time"
)
// 任务执行器
type TaskExecutor struct {
tasks []func() error
wg sync.WaitGroup
errChan chan error
timeout time.Duration
}
func NewTaskExecutor(timeout time.Duration) *TaskExecutor {
return &TaskExecutor{
tasks: make([]func() error, 0),
errChan: make(chan error, 100),
timeout: timeout,
}
}
// 添加任务
func (te *TaskExecutor) AddTask(task func() error) {
te.tasks = append(te.tasks, task)
}
// 执行所有任务
func (te *TaskExecutor) Execute() []error {
te.wg.Add(len(te.tasks))
// 启动所有任务
for i := range te.tasks {
go func(taskID int, task func() error) {
defer te.wg.Done()
// 执行任务并捕获错误
if err := task(); err != nil {
select {
case te.errChan <- fmt.Errorf("task %d failed: %w", taskID, err):
default:
// 错误channel满了就打印错误
fmt.Printf("Error channel full, task %d error: %v\n", taskID, err)
}
}
}(i, te.tasks[i])
}
// 等待任务完成或超时
done := make(chan struct{})
go func() {
te.wg.Wait()
close(done)
}()
// 等待完成或超时
select {
case <-done:
// 所有任务完成
case <-time.After(te.timeout):
fmt.Println("Execution timed out")
}
// 收集所有错误
close(te.errChan)
var errors []error
for err := range te.errChan {
errors = append(errors, err)
}
return errors
}
// 模拟任务
func createTask(id int, duration time.Duration, shouldFail bool) func() error {
return func() error {
fmt.Printf("Task %d started\n", id)
time.Sleep(duration)
if shouldFail {
return fmt.Errorf("task %d failed", id)
}
fmt.Printf("Task %d completed\n", id)
return nil
}
}
func main() {
executor := NewTaskExecutor(5 * time.Second)
// 添加一些测试任务
executor.AddTask(createTask(1, 2*time.Second, false))
executor.AddTask(createTask(2, 1*time.Second, true))
executor.AddTask(createTask(3, 3*time.Second, false))
executor.AddTask(createTask(4, 2*time.Second, true))
executor.AddTask(createTask(5, 1*time.Second, false))
// 执行任务并收集错误
errors := executor.Execute()
// 打印执行结果
if len(errors) > 0 {
fmt.Println("\nExecution completed with errors:")
for _, err := range errors {
fmt.Printf("- %v\n", err)
}
} else {
fmt.Println("\nAll tasks completed successfully")
}
}
1.2 WaitGroup执行流程
2. Once的实现原理
2.1 Once基本概念
sync.Once保证一个函数只执行一次,常用于单例模式或一次性初始化。
让我们实现一个使用Once的配置加载器:
package main
import (
"encoding/json"
"fmt"
"sync"
"time"
)
// 配置结构
type Config struct {
DatabaseURL string `json:"database_url"`
RedisURL string `json:"redis_url"`
APIKey string `json:"api_key"`
MaxWorkers int `json:"max_workers"`
}
// 配置管理器
type ConfigManager struct {
config *Config
once sync.Once
}
var (
configManager *ConfigManager
instanceOnce sync.Once
)
// 获取ConfigManager单例
func GetConfigManager() *ConfigManager {
instanceOnce.Do(func() {
configManager = &ConfigManager{
config: &Config{},
}
})
return configManager
}
// 加载配置
func (cm *ConfigManager) LoadConfig() (*Config, error) {
var err error
cm.once.Do(func() {
// 模拟从文件或远程加载配置
time.Sleep(time.Second) // 模拟IO操作
// 模拟JSON配置
jsonConfig := `{
"database_url": "postgres://user:pass@localhost:5432/db",
"redis_url": "redis://localhost:6379",
"api_key": "secret-key-123",
"max_workers": 10
}`
err = json.Unmarshal([]byte(jsonConfig), cm.config)
if err == nil {
fmt.Println("Configuration loaded successfully")
}
})
if err != nil {
return nil, fmt.Errorf("failed to load config: %w", err)
}
return cm.config, nil
}
// 模拟配置使用
func useConfig(id int) {
cm := GetConfigManager()
config, err := cm.LoadConfig()
if err != nil {
fmt.Printf("Goroutine %d failed to get config: %v\n", id, err)
return
}
fmt.Printf("Goroutine %d using config: DB=%s, Redis=%s, Workers=%d\n",
id, config.DatabaseURL, config.RedisURL, config.MaxWorkers)
}
func main() {
// 模拟多个goroutine同时访问配置
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Duration(id*100) * time.Millisecond) // 错开启动时间
useConfig(id)
}(i)
}
wg.Wait()
}
2.2 Once实现流程图
3. Pool的性能优化
3.1 Pool的作用与原理
sync.Pool用于存储和复用临时对象,减少内存分配和GC压力。
让我们实现一个使用Pool优化的数据处理系统:
package main
import (
"bytes"
"encoding/json"
"fmt"
"sync"
"time"
)
// 数据处理器
type DataProcessor struct {
bufferPool *sync.Pool
jsonPool *sync.Pool
}
// 示例数据结构
type DataItem struct {
ID int `json:"id"`
Name string `json:"name"`
Value float64 `json:"value"`
Timestamp time.Time `json:"timestamp"`
}
// 创建新的数据处理器
func NewDataProcessor() *DataProcessor {
return &DataProcessor{
// 创建buffer对象池
bufferPool: &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
// 创建数据对象池
jsonPool: &sync.Pool{
New: func() interface{} {
return new(DataItem)
},
},
}
}
// 处理数据
func (dp *DataProcessor) ProcessData(data []byte) error {
// 从对象池获取buffer
buf := dp.bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
dp.bufferPool.Put(buf)
}()
// 从对象池获取数据对象
item := dp.jsonPool.Get().(*DataItem)
defer dp.jsonPool.Put(item)
// 解析JSON数据
if err := json.Unmarshal(data, item); err != nil {
return fmt.Errorf("failed to unmarshal data: %w", err)
}
// 处理数据
item.Value = item.Value * 1.1 // 示例处理:增加10%
// 使用buffer进行JSON编码
if err := json.NewEncoder(buf).Encode(item); err != nil {
return fmt.Errorf("failed to encode result: %w", err)
}
// 在实际应用中,这里可能会将处理结果发送到某个目的地
fmt.Printf("Processed: %s", buf.String())
return nil
}
func main() {
processor := NewDataProcessor()
var wg sync.WaitGroup
// 模拟大量并发请求
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 创建测试数据
testData := DataItem{
ID: id,
Name: fmt.Sprintf("Item-%d", id),
Value: float64(id * 100),
Timestamp: time.Now(),
}
// 序列化测试数据
data, err := json.Marshal(testData)
if err != nil {
fmt.Printf("Failed to marshal test data: %v\n", err)
return
}
// 处理数据
if err := processor.ProcessData(data); err != nil {
fmt.Printf("Failed to process data: %v\n", err)
}
}(i)
}
wg.Wait()
}
3.2 Pool使用注意事项
- 对象的生命周期管理
- 内存泄漏风险
- 性能监控和调优
让我们继续完成Cond的应用场景部分。
4. Cond的应用场景
4.1 Cond基本概念
sync.Cond是一个条件变量,它在共享资源的状态变化时对多个goroutine进行协调。
让我们通过一个生产者-消费者模型来详细理解Cond的使用:
package main
import (
"fmt"
"sync"
"time"
)
// 任务队列
type TaskQueue struct {
mutex sync.Mutex
cond *sync.Cond
tasks []string
maxSize int
closed bool
}
// 创建新的任务队列
func NewTaskQueue(size int) *TaskQueue {
tq := &TaskQueue{
tasks: make([]string, 0, size),
maxSize: size,
}
tq.cond = sync.NewCond(&tq.mutex)
return tq
}
// 添加任务
func (tq *TaskQueue) Push(task string) error {
tq.mutex.Lock()
defer tq.mutex.Unlock()
// 检查队列是否已关闭
if tq.closed {
return fmt.Errorf("task queue is closed")
}
// 等待队列有空余位置
for len(tq.tasks) >= tq.maxSize {
tq.cond.Wait()
// 再次检查队列状态
if tq.closed {
return fmt.Errorf("task queue is closed")
}
}
// 添加任务
tq.tasks = append(tq.tasks, task)
fmt.Printf("Task added: %s, queue size: %d\n", task, len(tq.tasks))
// 通知等待的消费者
tq.cond.Signal()
return nil
}
// 获取任务
func (tq *TaskQueue) Pop() (string, error) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
// 等待任务可用
for len(tq.tasks) == 0 && !tq.closed {
tq.cond.Wait()
}
// 检查队列是否已关闭且为空
if len(tq.tasks) == 0 && tq.closed {
return "", fmt.Errorf("task queue is closed and empty")
}
// 取出任务
task := tq.tasks[0]
tq.tasks = tq.tasks[1:]
fmt.Printf("Task removed: %s, queue size: %d\n", task, len(tq.tasks))
// 通知等待的生产者
tq.cond.Signal()
return task, nil
}
// 关闭队列
func (tq *TaskQueue) Close() {
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.closed = true
// 通知所有等待的goroutine
tq.cond.Broadcast()
}
func main() {
// 创建一个容量为3的任务队列
queue := NewTaskQueue(3)
var wg sync.WaitGroup
// 启动3个生产者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
task := fmt.Sprintf("Task-P%d-%d", id, j)
err := queue.Push(task)
if err != nil {
fmt.Printf("Producer %d failed: %v\n", id, err)
return
}
time.Sleep(time.Millisecond * 100) // 模拟生产耗时
}
}(i)
}
// 启动2个消费者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
task, err := queue.Pop()
if err != nil {
fmt.Printf("Consumer %d exit: %v\n", id, err)
return
}
fmt.Printf("Consumer %d processed %s\n", id, task)
time.Sleep(time.Millisecond * 200) // 模拟消费耗时
}
}(i)
}
// 等待生产者完成后关闭队列
go func() {
wg.Wait()
fmt.Println("All producers finished, closing queue...")
queue.Close()
}()
// 等待所有goroutine完成
time.Sleep(time.Second * 5)
fmt.Println("Main: exit")
}
4.2 Cond的工作流程
4.3 Cond的常见应用场景
-
生产者-消费者模式
- 队列满/空的等待条件
- 批量处理的条件同步
-
资源池管理
- 连接池的可用连接等待
- 工作池的任务分发
-
线程协调
- 启动信号的统一等待
- 任务完成的同步通知
-
状态变更通知
- 配置更新的广播
- 系统状态转换的通知
4.4 性能优化建议
- 条件检查
// 推荐做法
for !condition() {
cond.Wait()
}
// 不推荐做法
if !condition() {
cond.Wait()
}
- 信号通知
// 单个等待者使用Signal
cond.Signal()
// 多个等待者使用Broadcast
cond.Broadcast()
- 锁的范围
// 推荐做法
mu.Lock()
// 最小化临界区
mu.Unlock()
// 不推荐做法
mu.Lock()
// 大量非临界区操作
mu.Unlock()
总结
知识要点回顾
- WaitGroup
- Add/Done/Wait的正确使用
- 并发任务的同步控制
- 错误处理和超时机制
- Once
- 单例模式实现
- 初始化控制
- 并发安全保证
- Pool
- 对象复用机制
- 内存优化策略
- 性能监控方法
- Cond
- 条件变量的使用
- 生产者-消费者模式
- 多goroutine协调
实践建议
- 代码质量
- 始终使用defer确保解锁
- 检查并处理所有错误情况
- 添加适当的超时机制
- 性能优化
- 最小化锁的范围
- 合理使用对象池
- 避免过度同步
- 调试技巧
- 使用race detector
- 添加详细的日志
- 监控关键指标
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!