当前位置: 首页 > article >正文

40分钟学 Go 语言高并发:sync包详解(下)

sync包详解(下)

学习目标

知识点掌握程度应用场景
WaitGroup使用熟练使用和理解原理并发任务的同步等待
Once实现原理理解底层实现和使用场景单例模式、一次性初始化
Pool性能优化掌握对象池的使用和调优高并发下的内存优化
Cond应用场景了解条件变量的使用方法多线程协调和通信

1. WaitGroup的使用

1.1 WaitGroup基本原理

WaitGroup用于等待一组goroutine完成。它提供三个方法:

  • Add(): 增加等待的goroutine数量
  • Done(): 标记一个goroutine完成
  • Wait(): 等待所有goroutine完成

让我们通过一个完整的例子来理解WaitGroup的使用:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 任务执行器
type TaskExecutor struct {
    tasks    []func() error
    wg       sync.WaitGroup
    errChan  chan error
    timeout  time.Duration
}

func NewTaskExecutor(timeout time.Duration) *TaskExecutor {
    return &TaskExecutor{
        tasks:    make([]func() error, 0),
        errChan:  make(chan error, 100),
        timeout:  timeout,
    }
}

// 添加任务
func (te *TaskExecutor) AddTask(task func() error) {
    te.tasks = append(te.tasks, task)
}

// 执行所有任务
func (te *TaskExecutor) Execute() []error {
    te.wg.Add(len(te.tasks))
    
    // 启动所有任务
    for i := range te.tasks {
        go func(taskID int, task func() error) {
            defer te.wg.Done()
            
            // 执行任务并捕获错误
            if err := task(); err != nil {
                select {
                case te.errChan <- fmt.Errorf("task %d failed: %w", taskID, err):
                default:
                    // 错误channel满了就打印错误
                    fmt.Printf("Error channel full, task %d error: %v\n", taskID, err)
                }
            }
        }(i, te.tasks[i])
    }
    
    // 等待任务完成或超时
    done := make(chan struct{})
    go func() {
        te.wg.Wait()
        close(done)
    }()
    
    // 等待完成或超时
    select {
    case <-done:
        // 所有任务完成
    case <-time.After(te.timeout):
        fmt.Println("Execution timed out")
    }
    
    // 收集所有错误
    close(te.errChan)
    var errors []error
    for err := range te.errChan {
        errors = append(errors, err)
    }
    
    return errors
}

// 模拟任务
func createTask(id int, duration time.Duration, shouldFail bool) func() error {
    return func() error {
        fmt.Printf("Task %d started\n", id)
        time.Sleep(duration)
        if shouldFail {
            return fmt.Errorf("task %d failed", id)
        }
        fmt.Printf("Task %d completed\n", id)
        return nil
    }
}

func main() {
    executor := NewTaskExecutor(5 * time.Second)
    
    // 添加一些测试任务
    executor.AddTask(createTask(1, 2*time.Second, false))
    executor.AddTask(createTask(2, 1*time.Second, true))
    executor.AddTask(createTask(3, 3*time.Second, false))
    executor.AddTask(createTask(4, 2*time.Second, true))
    executor.AddTask(createTask(5, 1*time.Second, false))
    
    // 执行任务并收集错误
    errors := executor.Execute()
    
    // 打印执行结果
    if len(errors) > 0 {
        fmt.Println("\nExecution completed with errors:")
        for _, err := range errors {
            fmt.Printf("- %v\n", err)
        }
    } else {
        fmt.Println("\nAll tasks completed successfully")
    }
}

1.2 WaitGroup执行流程

在这里插入图片描述

2. Once的实现原理

2.1 Once基本概念

sync.Once保证一个函数只执行一次,常用于单例模式或一次性初始化。

让我们实现一个使用Once的配置加载器:

package main

import (
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

// 配置结构
type Config struct {
    DatabaseURL string `json:"database_url"`
    RedisURL    string `json:"redis_url"`
    APIKey      string `json:"api_key"`
    MaxWorkers  int    `json:"max_workers"`
}

// 配置管理器
type ConfigManager struct {
    config *Config
    once   sync.Once
}

var (
    configManager *ConfigManager
    instanceOnce  sync.Once
)

// 获取ConfigManager单例
func GetConfigManager() *ConfigManager {
    instanceOnce.Do(func() {
        configManager = &ConfigManager{
            config: &Config{},
        }
    })
    return configManager
}

// 加载配置
func (cm *ConfigManager) LoadConfig() (*Config, error) {
    var err error
    
    cm.once.Do(func() {
        // 模拟从文件或远程加载配置
        time.Sleep(time.Second) // 模拟IO操作
        
        // 模拟JSON配置
        jsonConfig := `{
            "database_url": "postgres://user:pass@localhost:5432/db",
            "redis_url": "redis://localhost:6379",
            "api_key": "secret-key-123",
            "max_workers": 10
        }`
        
        err = json.Unmarshal([]byte(jsonConfig), cm.config)
        if err == nil {
            fmt.Println("Configuration loaded successfully")
        }
    })
    
    if err != nil {
        return nil, fmt.Errorf("failed to load config: %w", err)
    }
    
    return cm.config, nil
}

// 模拟配置使用
func useConfig(id int) {
    cm := GetConfigManager()
    config, err := cm.LoadConfig()
    if err != nil {
        fmt.Printf("Goroutine %d failed to get config: %v\n", id, err)
        return
    }
    
    fmt.Printf("Goroutine %d using config: DB=%s, Redis=%s, Workers=%d\n",
        id, config.DatabaseURL, config.RedisURL, config.MaxWorkers)
}

func main() {
    // 模拟多个goroutine同时访问配置
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(time.Duration(id*100) * time.Millisecond) // 错开启动时间
            useConfig(id)
        }(i)
    }
    
    wg.Wait()
}

2.2 Once实现流程图

在这里插入图片描述

3. Pool的性能优化

3.1 Pool的作用与原理

sync.Pool用于存储和复用临时对象,减少内存分配和GC压力。

让我们实现一个使用Pool优化的数据处理系统:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

// 数据处理器
type DataProcessor struct {
    bufferPool *sync.Pool
    jsonPool   *sync.Pool
}

// 示例数据结构
type DataItem struct {
    ID        int       `json:"id"`
    Name      string    `json:"name"`
    Value     float64   `json:"value"`
    Timestamp time.Time `json:"timestamp"`
}

// 创建新的数据处理器
func NewDataProcessor() *DataProcessor {
    return &DataProcessor{
        // 创建buffer对象池
        bufferPool: &sync.Pool{
            New: func() interface{} {
                return new(bytes.Buffer)
            },
        },
        // 创建数据对象池
        jsonPool: &sync.Pool{
            New: func() interface{} {
                return new(DataItem)
            },
        },
    }
}

// 处理数据
func (dp *DataProcessor) ProcessData(data []byte) error {
    // 从对象池获取buffer
    buf := dp.bufferPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        dp.bufferPool.Put(buf)
    }()
    
    // 从对象池获取数据对象
    item := dp.jsonPool.Get().(*DataItem)
    defer dp.jsonPool.Put(item)
    
    // 解析JSON数据
    if err := json.Unmarshal(data, item); err != nil {
        return fmt.Errorf("failed to unmarshal data: %w", err)
    }
    
    // 处理数据
    item.Value = item.Value * 1.1 // 示例处理:增加10%
    
    // 使用buffer进行JSON编码
    if err := json.NewEncoder(buf).Encode(item); err != nil {
        return fmt.Errorf("failed to encode result: %w", err)
    }
    
    // 在实际应用中,这里可能会将处理结果发送到某个目的地
    fmt.Printf("Processed: %s", buf.String())
    
    return nil
}

func main() {
    processor := NewDataProcessor()
    var wg sync.WaitGroup
    
    // 模拟大量并发请求
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            // 创建测试数据
            testData := DataItem{
                ID:        id,
                Name:      fmt.Sprintf("Item-%d", id),
                Value:     float64(id * 100),
                Timestamp: time.Now(),
            }
            
            // 序列化测试数据
            data, err := json.Marshal(testData)
            if err != nil {
                fmt.Printf("Failed to marshal test data: %v\n", err)
                return
            }
            
            // 处理数据
            if err := processor.ProcessData(data); err != nil {
                fmt.Printf("Failed to process data: %v\n", err)
            }
        }(i)
    }
    
    wg.Wait()
}

3.2 Pool使用注意事项

  1. 对象的生命周期管理
  2. 内存泄漏风险
  3. 性能监控和调优

让我们继续完成Cond的应用场景部分。

4. Cond的应用场景

4.1 Cond基本概念

sync.Cond是一个条件变量,它在共享资源的状态变化时对多个goroutine进行协调。

让我们通过一个生产者-消费者模型来详细理解Cond的使用:

package main

import (
    "fmt"
    "sync"
    "time"
)

// 任务队列
type TaskQueue struct {
    mutex     sync.Mutex
    cond      *sync.Cond
    tasks     []string
    maxSize   int
    closed    bool
}

// 创建新的任务队列
func NewTaskQueue(size int) *TaskQueue {
    tq := &TaskQueue{
        tasks:   make([]string, 0, size),
        maxSize: size,
    }
    tq.cond = sync.NewCond(&tq.mutex)
    return tq
}

// 添加任务
func (tq *TaskQueue) Push(task string) error {
    tq.mutex.Lock()
    defer tq.mutex.Unlock()

    // 检查队列是否已关闭
    if tq.closed {
        return fmt.Errorf("task queue is closed")
    }

    // 等待队列有空余位置
    for len(tq.tasks) >= tq.maxSize {
        tq.cond.Wait()
        // 再次检查队列状态
        if tq.closed {
            return fmt.Errorf("task queue is closed")
        }
    }

    // 添加任务
    tq.tasks = append(tq.tasks, task)
    fmt.Printf("Task added: %s, queue size: %d\n", task, len(tq.tasks))
    
    // 通知等待的消费者
    tq.cond.Signal()
    return nil
}

// 获取任务
func (tq *TaskQueue) Pop() (string, error) {
    tq.mutex.Lock()
    defer tq.mutex.Unlock()

    // 等待任务可用
    for len(tq.tasks) == 0 && !tq.closed {
        tq.cond.Wait()
    }

    // 检查队列是否已关闭且为空
    if len(tq.tasks) == 0 && tq.closed {
        return "", fmt.Errorf("task queue is closed and empty")
    }

    // 取出任务
    task := tq.tasks[0]
    tq.tasks = tq.tasks[1:]
    fmt.Printf("Task removed: %s, queue size: %d\n", task, len(tq.tasks))
    
    // 通知等待的生产者
    tq.cond.Signal()
    return task, nil
}

// 关闭队列
func (tq *TaskQueue) Close() {
    tq.mutex.Lock()
    defer tq.mutex.Unlock()
    
    tq.closed = true
    // 通知所有等待的goroutine
    tq.cond.Broadcast()
}

func main() {
    // 创建一个容量为3的任务队列
    queue := NewTaskQueue(3)
    var wg sync.WaitGroup

    // 启动3个生产者
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                task := fmt.Sprintf("Task-P%d-%d", id, j)
                err := queue.Push(task)
                if err != nil {
                    fmt.Printf("Producer %d failed: %v\n", id, err)
                    return
                }
                time.Sleep(time.Millisecond * 100) // 模拟生产耗时
            }
        }(i)
    }

    // 启动2个消费者
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for {
                task, err := queue.Pop()
                if err != nil {
                    fmt.Printf("Consumer %d exit: %v\n", id, err)
                    return
                }
                fmt.Printf("Consumer %d processed %s\n", id, task)
                time.Sleep(time.Millisecond * 200) // 模拟消费耗时
            }
        }(i)
    }

    // 等待生产者完成后关闭队列
    go func() {
        wg.Wait()
        fmt.Println("All producers finished, closing queue...")
        queue.Close()
    }()

    // 等待所有goroutine完成
    time.Sleep(time.Second * 5)
    fmt.Println("Main: exit")
}

4.2 Cond的工作流程

在这里插入图片描述

4.3 Cond的常见应用场景

  1. 生产者-消费者模式

    • 队列满/空的等待条件
    • 批量处理的条件同步
  2. 资源池管理

    • 连接池的可用连接等待
    • 工作池的任务分发
  3. 线程协调

    • 启动信号的统一等待
    • 任务完成的同步通知
  4. 状态变更通知

    • 配置更新的广播
    • 系统状态转换的通知

4.4 性能优化建议

  1. 条件检查
// 推荐做法
for !condition() {
    cond.Wait()
}

// 不推荐做法
if !condition() {
    cond.Wait()
}
  1. 信号通知
// 单个等待者使用Signal
cond.Signal()

// 多个等待者使用Broadcast
cond.Broadcast()
  1. 锁的范围
// 推荐做法
mu.Lock()
// 最小化临界区
mu.Unlock()

// 不推荐做法
mu.Lock()
// 大量非临界区操作
mu.Unlock()

总结

知识要点回顾

  1. WaitGroup
  • Add/Done/Wait的正确使用
  • 并发任务的同步控制
  • 错误处理和超时机制
  1. Once
  • 单例模式实现
  • 初始化控制
  • 并发安全保证
  1. Pool
  • 对象复用机制
  • 内存优化策略
  • 性能监控方法
  1. Cond
  • 条件变量的使用
  • 生产者-消费者模式
  • 多goroutine协调

实践建议

  1. 代码质量
  • 始终使用defer确保解锁
  • 检查并处理所有错误情况
  • 添加适当的超时机制
  1. 性能优化
  • 最小化锁的范围
  • 合理使用对象池
  • 避免过度同步
  1. 调试技巧
  • 使用race detector
  • 添加详细的日志
  • 监控关键指标

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.kler.cn/a/406370.html

相关文章:

  • PDF电子发票信息转excel信息汇总
  • Git 提交的相对引用
  • flink学习(1)——standalone模式的安装
  • Spring Web入门练习
  • 实际开发中的协变与逆变案例:数据处理流水线
  • 用java和redis实现考试成绩排行榜
  • 如何用通义灵码助力项目开发 | OceanBase obdiag 项目共建实践
  • 【大数据学习 | Spark-Core】Spark的分区器(HashPartitioner和RangePartitioner)
  • 大数据新视界 -- 大数据大厂之 Hive 数据导入:多源数据集成的策略与实战(上)(3/ 30)
  • xiaolin coding 图解网络笔记——HTTP篇
  • Antd中的布局组件
  • RecyclerView详解——(四)缓存复用机制
  • 论文阅读——Intrusion detection systems using longshort‑term memory (LSTM)
  • 儿童玩具安全检测GB6675标准详细介绍
  • PHP 8.4 重磅发布了
  • 如何创建你的第一个 Telegram 机器人:一步步教程
  • 【Python TensorFlow】进阶指南(续篇三)
  • STM32(hal库)中,为什么DMA没有MSP函数?
  • C# 中Timer的三种用法
  • 代码随想录1016-Day17
  • 【bug】python常见的错误以及解决办法
  • 大数据环境下的高效数据清洗策略
  • 【信息系统项目管理师】第2章:信息技术发展 考点梳理
  • 泥石流灾害风险评估与模拟丨AI与R语言、ArcGIS、HECRAS融合,提升泥石流灾害风险预测的精度和准确性
  • CSS遮罩:mask
  • 使用minio cllient(mc)完成不同服务器的minio的数据迁移和mc基本操作