当前位置: 首页 > article >正文

40分钟学 Go 语言高并发:超时控制与取消机制

超时控制与取消机制

一、知识要点概述

知识模块重要程度掌握要求
超时策略⭐⭐⭐⭐⭐深入理解context包的超时机制,掌握不同场景下的超时控制方法
取消传播⭐⭐⭐⭐⭐熟练运用context的取消传播机制,实现优雅的任务取消
资源清理⭐⭐⭐⭐掌握goroutine退出时的资源清理方法,避免资源泄露
级联取消⭐⭐⭐⭐理解并实现多层级的取消控制,确保子任务能正确响应取消信号

二、详细内容

1. 超时策略

超时控制是高并发系统中非常重要的一环,它能够防止系统资源被长时间占用,提高系统的可用性。

package main

import (
    "context"
    "fmt"
    "time"
)

// 模拟一个耗时操作
func longRunningOperation(ctx context.Context) error {
    // 创建一个channel用于接收操作结果
    done := make(chan bool)
    
    go func() {
        // 模拟耗时操作
        time.Sleep(2 * time.Second)
        done <- true
    }()
    
    // 使用select实现超时控制
    select {
    case <-done:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// 使用context.WithTimeout实现超时控制
func withTimeoutExample() {
    // 创建一个1秒超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    if err := longRunningOperation(ctx); err != nil {
        fmt.Printf("操作超时: %v\n", err)
        return
    }
    fmt.Println("操作成功完成")
}

// 使用time.After实现超时控制
func withTimeAfterExample() {
    done := make(chan bool)
    
    go func() {
        time.Sleep(2 * time.Second)
        done <- true
    }()
    
    select {
    case <-done:
        fmt.Println("操作成功完成")
    case <-time.After(1 * time.Second):
        fmt.Println("操作超时")
    }
}

// 带有重试机制的超时控制
func withRetryTimeout(maxRetries int, timeout time.Duration) error {
    for i := 0; i < maxRetries; i++ {
        ctx, cancel := context.WithTimeout(context.Background(), timeout)
        err := longRunningOperation(ctx)
        cancel()
        
        if err == nil {
            return nil
        }
        
        fmt.Printf("第%d次重试失败: %v\n", i+1, err)
        time.Sleep(time.Second) // 重试间隔
    }
    return fmt.Errorf("达到最大重试次数")
}

func main() {
    fmt.Println("=== 使用context.WithTimeout示例 ===")
    withTimeoutExample()
    
    fmt.Println("\n=== 使用time.After示例 ===")
    withTimeAfterExample()
    
    fmt.Println("\n=== 带重试机制的超时控制示例 ===")
    err := withRetryTimeout(3, time.Second)
    if err != nil {
        fmt.Printf("最终结果: %v\n", err)
    }
}
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 模拟数据处理任务
type Task struct {
    ID int
    ProcessTime time.Duration
}

// 工作协程
func worker(ctx context.Context, id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                fmt.Printf("Worker %d: 通道已关闭,退出\n", id)
                return
            }
            
            // 检查是否已取消
            select {
            case <-ctx.Done():
                fmt.Printf("Worker %d: 收到取消信号,停止处理任务\n", id)
                return
            default:
                // 继续处理任务
                fmt.Printf("Worker %d: 开始处理任务 %d\n", id, task.ID)
                time.Sleep(task.ProcessTime)
                fmt.Printf("Worker %d: 完成任务 %d\n", id, task.ID)
            }
            
        case <-ctx.Done():
            fmt.Printf("Worker %d: 收到取消信号,退出\n", id)
            return
        }
    }
}

// 任务分发器
func dispatcher(ctx context.Context, numWorkers int) {
    var wg sync.WaitGroup
    tasks := make(chan Task, numWorkers)
    
    // 启动工作协程
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(ctx, i, tasks, &wg)
    }
    
    // 模拟任务生成
    taskID := 0
    for {
        select {
        case <-ctx.Done():
            close(tasks)
            wg.Wait()
            fmt.Println("调度器: 所有工作协程已退出")
            return
        default:
            // 生成新任务
            taskID++
            task := Task{
                ID: taskID,
                ProcessTime: time.Duration(taskID*100) * time.Millisecond,
            }
            tasks <- task
            time.Sleep(200 * time.Millisecond) // 控制任务生成速率
        }
    }
}

func main() {
    // 创建可取消的context
    ctx, cancel := context.WithCancel(context.Background())
    
    // 启动调度器
    go dispatcher(ctx, 3)
    
    // 运行一段时间后取消
    time.Sleep(2 * time.Second)
    fmt.Println("主程序: 发送取消信号")
    cancel()
    
    // 等待一段时间确保清理完成
    time.Sleep(time.Second)
    fmt.Println("主程序: 退出")
}
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 模拟数据库连接
type DBConnection struct {
    id     int
    closed bool
    mu     sync.Mutex
}

func (db *DBConnection) Query(ctx context.Context) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(500 * time.Millisecond):
        return nil
    }
}

func (db *DBConnection) Close() error {
    db.mu.Lock()
    defer db.mu.Unlock()
    
    if !db.closed {
        fmt.Printf("关闭数据库连接 %d\n", db.id)
        db.closed = true
    }
    return nil
}

// 资源管理器
type ResourceManager struct {
    connections []*DBConnection
    mu          sync.Mutex
}

func NewResourceManager() *ResourceManager {
    return &ResourceManager{
        connections: make([]*DBConnection, 0),
    }
}

func (rm *ResourceManager) AcquireConnection() *DBConnection {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    conn := &DBConnection{
        id: len(rm.connections),
    }
    rm.connections = append(rm.connections, conn)
    fmt.Printf("创建新的数据库连接 %d\n", conn.id)
    return conn
}

func (rm *ResourceManager) ReleaseAll() {
    rm.mu.Lock()
    defer rm.mu.Unlock()
    
    for _, conn := range rm.connections {
        conn.Close()
    }
    rm.connections = nil
}

// 工作任务
func doWork(ctx context.Context, rm *ResourceManager) error {
    // 获取数据库连接
    conn := rm.AcquireConnection()
    
    // 确保在函数退出时释放资源
    defer func() {
        if err := conn.Close(); err != nil {
            fmt.Printf("关闭连接失败: %v\n", err)
        }
    }()
    
    // 执行查询
    return conn.Query(ctx)
}

func main() {
    rm := NewResourceManager()
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    var wg sync.WaitGroup
    
    // 启动多个工作协程
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            for j := 0; j < 2; j++ {
                select {
                case <-ctx.Done():
                    fmt.Printf("工作协程 %d: 收到取消信号\n", id)
                    return
                default:
                    if err := doWork(ctx, rm); err != nil {
                        fmt.Printf("工作协程 %d: 执行失败 - %v\n", id, err)
                        return
                    }
                    fmt.Printf("工作协程 %d: 完成一次查询\n", id)
                }
            }
        }(i)
    }
    
    // 等待所有工作完成
    wg.Wait()
    
    // 清理所有资源
    fmt.Println("清理所有资源")
    rm.ReleaseAll()
}
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Pipeline 代表一个处理管道
type Pipeline struct {
    name     string
    process  func(context.Context) error
    children []*Pipeline
    wg       sync.WaitGroup
}

// NewPipeline 创建新的处理管道
func NewPipeline(name string, process func(context.Context) error) *Pipeline {
    return &Pipeline{
        name:     name,
        process:  process,
        children: make([]*Pipeline, 0),
    }
}

// AddChild 添加子管道
func (p *Pipeline) AddChild(child *Pipeline) {
    p.children = append(p.children, child)
}

// Execute 执行当前管道及其所有子管道
func (p *Pipeline) Execute(ctx context.Context) error {
    // 创建一个子context,用于控制子管道
    childCtx, cancel := context.WithCancel(ctx)
    defer cancel() // 确保所有子管道都会被取消

    // 启动所有子管道
    errChan := make(chan error, len(p.children))
    for _, child := range p.children {
        p.wg.Add(1)
        go func(pipeline *Pipeline) {
            defer p.wg.Done()
            if err := pipeline.Execute(childCtx); err != nil {
                errChan <- err
                cancel() // 如果有错误发生,取消所有其他子管道
            }
        }(child)
    }

    // 执行当前管道的处理函数
    processDone := make(chan error, 1)
    go func() {
        processDone <- p.process(childCtx)
    }()

    // 等待处理完成或者context取消
    select {
    case err := <-processDone:
        if err != nil {
            cancel() // 如果处理出错,取消所有子管道
            fmt.Printf("[%s] 处理错误: %v\n", p.name, err)
            return err
        }
    case err := <-errChan:
        fmt.Printf("[%s] 子管道错误: %v\n", p.name, err)
        return err
    case <-ctx.Done():
        fmt.Printf("[%s] 被取消\n", p.name)
        return ctx.Err()
    }

    // 等待所有子管道完成
    waitCh := make(chan struct{})
    go func() {
        p.wg.Wait()
        close(waitCh)
    }()

    select {
    case <-waitCh:
        fmt.Printf("[%s] 及其所有子管道已完成\n", p.name)
        return nil
    case <-ctx.Done():
        fmt.Printf("[%s] 等待子管道完成时被取消\n", p.name)
        return ctx.Err()
    }
}

// 模拟处理函数,带有随机延时
func createProcessor(name string, duration time.Duration) func(context.Context) error {
    return func(ctx context.Context) error {
        fmt.Printf("[%s] 开始处理\n", name)
        select {
        case <-time.After(duration):
            fmt.Printf("[%s] 处理完成\n", name)
            return nil
        case <-ctx.Done():
            fmt.Printf("[%s] 处理被取消\n", name)
            return ctx.Err()
        }
    }
}

func main() {
    // 创建根管道和子管道
    root := NewPipeline("ROOT", createProcessor("ROOT", 1*time.Second))
    
    // 第一层子管道
    stage1A := NewPipeline("1A", createProcessor("1A", 2*time.Second))
    stage1B := NewPipeline("1B", createProcessor("1B", 1500*time.Millisecond))
    root.AddChild(stage1A)
    root.AddChild(stage1B)
    
    // 第二层子管道
    stage2A := NewPipeline("2A", createProcessor("2A", 1*time.Second))
    stage2B := NewPipeline("2B", createProcessor("2B", 800*time.Millisecond))
    stage1A.AddChild(stage2A)
    stage1B.AddChild(stage2B)
    
    // 第三层子管道
    stage3A := NewPipeline("3A", createProcessor("3A", 500*time.Millisecond))
    stage2A.AddChild(stage3A)

    // 创建根context,设置3秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    // 启动处理管道
    fmt.Println("开始执行管道...")
    startTime := time.Now()
    
    if err := root.Execute(ctx); err != nil {
        fmt.Printf("管道执行失败: %v\n", err)
    }
    
    fmt.Printf("总执行时间: %v\n", time.Since(startTime))
}

继续补充级联取消示例代码和详细的知识讲解。

让我们详细解析一下上面的代码示例和相关知识点:

2. 超时控制详解

超时控制主要有以下几种实现方式:

  1. 使用context.WithTimeout

    • 优点:可以传播取消信号,适合多层级的调用链
    • 用法:创建带超时的context,传递给下层函数
  2. 使用time.After

    • 优点:实现简单,适合单个操作的超时控制
    • 缺点:不能传播取消信号
  3. 带重试机制的超时控制

    • 结合重试策略,提高操作的成功率
    • 可以设置最大重试次数和重试间隔

3. 取消传播机制

取消传播的核心概念:

  1. Context链

    • 父context取消时,所有子context都会被取消
    • 子context取消不会影响父context
  2. 取消信号的传递

    • 通过Done()通道接收取消信号
    • 使用select监听多个通道
  3. 错误处理

    • context.Canceled:手动调用cancel
    • context.DeadlineExceeded:超时导致的取消

4. 资源清理

资源清理的关键点:

  1. defer的使用

    • 确保资源在函数退出时被释放
    • 处理panic情况下的资源清理
  2. 并发安全

    • 使用互斥锁保护共享资源
    • 避免重复释放
  3. 清理顺序

    • 按照依赖关系进行清理
    • 避免死锁和资源泄露

5. 级联取消

级联取消的实现要点:

  1. 树形结构

    • 构建父子关系的处理链
    • 自上而下传播取消信号
  2. 并发控制

    • 使用WaitGroup等待子任务完成
    • 处理取消时的优雅退出

让我们用Mermaid图来展示这些概念:
在这里插入图片描述

6. 最佳实践建议

  1. 超时控制

    • 为所有网络操作设置合理的超时时间
    • 在必要时实现重试机制
    • 避免使用过长的超时时间
  2. 取消传播

    • 合理使用context传递取消信号
    • 及时检查取消状态
    • 正确处理取消后的清理工作
  3. 资源管理

    • 使用defer确保资源释放
    • 实现优雅的关闭流程
    • 处理好并发安全问题
  4. 错误处理

    • 区分不同类型的错误
    • 提供有意义的错误信息
    • 实现合适的重试策略

7. 性能优化技巧

  1. 避免资源泄露

    • 使用工具检测泄露
    • 实现完整的清理流程
    • 定期进行资源回收
  2. 控制并发数量

    • 使用协程池
    • 实现背压机制
    • 监控系统资源使用
  3. 超时优化

    • 设置合适的超时阈值
    • 实现自适应超时
    • 监控超时情况

8. 调试和监控

  1. 日志记录

    • 记录关键操作的耗时
    • 记录取消和超时事件
    • 记录资源使用情况
  2. 监控指标

    • 超时次数和比率
    • 取消操作的统计
    • 资源使用情况
  3. 调试工具

    • 使用race detector
    • 使用pprof分析
    • 使用trace工具

9. 总结

通过本节课的学习,我们掌握了:

  1. 如何实现可靠的超时控制机制
  2. 如何正确使用context进行取消传播
  3. 如何安全地进行资源清理
  4. 如何实现级联取消功能

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.kler.cn/a/413840.html

相关文章:

  • 深度学习模型:卷积神经网络(CNN)
  • Linux服务器安装mongodb
  • Postman定义公共函数
  • javaweb-day02-JS(javascript)
  • mac下Gpt Chrome升级成GptBrowser书签和保存的密码恢复
  • 10、PyTorch autograd使用教程
  • 【多线程-第一天-多线程的技术方案-pthread带参数-桥接-bridge Objective-C语言】
  • OODA循环在网络安全运营平台建设中的应用
  • 【ESP32CAM+Android+C#上位机】ESP32-CAM在STA或AP模式下基于UDP与手机APP或C#上位机进行视频流/图像传输
  • QT5+OpenCV+libdmtx识别datamatrx ECC200二维码
  • 论文概览 |《Cities》2024.11 Vol.154(上)
  • 【tiler】一个数据可视化和地图处理切片的 Python 库
  • Rook入门:打造云原生Ceph存储的全面学习路径(上)
  • DAMODEL丹摩|部署FLUX.1+ComfyUI实战教程
  • MyBatis基本操作
  • 前端页面或弹窗在线预览文件的N种方式
  • python爬虫案例——猫眼电影数据抓取之字体解密,多套字体文件解密方法(20)
  • YOLOv10改进,YOLOv10添加TransNeXt中的ConvolutionalGLU模块,CVPR2024,二次创新C2f结构
  • TypeScript 字面量类型与类型别名
  • Sqoop的安装和配置,Sqoop的数据导入导出,MySQL对hdfs数据的操作
  • AWS EC2设置用户名密码登录
  • 通过 SSH 进行WordPress网站的高级服务器管理
  • Android 16 开发者预览版抢先使用
  • 字节跳动青训营刷题笔记19
  • TDengine在debian安装
  • 【C++】C++新增特性解析:Lambda表达式、包装器与绑定的应用