当前位置: 首页 > article >正文

Go语言高并发实战案例分析

目录

  1. 基础案例:简单的并发下载器
  2. 进阶案例:高并发网站访问统计
  3. 实战案例:分布式任务调度系统

基础案例:简单的并发下载器

问题描述

需要同时下载多个文件,使用并发方式提高下载效率。

实现代码

package main

import (
    "fmt"
    "io"
    "net/http"
    "os"
    "sync"
)

func downloadFile(url string, filename string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    // 创建HTTP请求
    resp, err := http.Get(url)
    if err != nil {
        fmt.Printf("下载 %s 失败: %v\n", filename, err)
        return
    }
    defer resp.Body.Close()
    
    // 创建文件
    file, err := os.Create(filename)
    if err != nil {
        fmt.Printf("创建文件 %s 失败: %v\n", filename, err)
        return
    }
    defer file.Close()
    
    // 写入文件
    _, err = io.Copy(file, resp.Body)
    if err != nil {
        fmt.Printf("写入文件 %s 失败: %v\n", filename, err)
        return
    }
    
    fmt.Printf("文件 %s 下载完成\n", filename)
}

func main() {
    urls := []string{
        "https://example.com/file1.zip",
        "https://example.com/file2.zip",
        "https://example.com/file3.zip",
    }
    
    var wg sync.WaitGroup
    
    for i, url := range urls {
        wg.Add(1)
        filename := fmt.Sprintf("file%d.zip", i+1)
        go downloadFile(url, filename, &wg)
    }
    
    wg.Wait()
    fmt.Println("所有文件下载完成")
}

关键点解析

  1. 使用sync.WaitGroup管理并发下载任务
  2. 每个下载任务在独立的goroutine中执行
  3. 使用defer确保资源正确释放
  4. 基本的错误处理机制

进阶案例:高并发网站访问统计

问题描述

需要统计网站的实时访问量,包括总访问次数、独立IP数等指标。

实现代码

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

type VisitStats struct {
    mutex       sync.RWMutex
    totalVisits int64
    uniqueIPs   map[string]bool
    lastMinute  map[int64]int64 // 按秒记录最近一分钟的访问量
}

func NewVisitStats() *VisitStats {
    return &VisitStats{
        uniqueIPs:  make(map[string]bool),
        lastMinute: make(map[int64]int64),
    }
}

func (vs *VisitStats) recordVisit(ip string) {
    vs.mutex.Lock()
    defer vs.mutex.Unlock()
    
    // 更新总访问量
    vs.totalVisits++
    
    // 记录唯一IP
    vs.uniqueIPs[ip] = true
    
    // 记录当前秒的访问量
    now := time.Now().Unix()
    vs.lastMinute[now]++
    
    // 清理一分钟前的数据
    vs.cleanOldData(now)
}

func (vs *VisitStats) cleanOldData(now int64) {
    for timestamp := range vs.lastMinute {
        if now-timestamp > 60 {
            delete(vs.lastMinute, timestamp)
        }
    }
}

func (vs *VisitStats) getStats() (int64, int, int64) {
    vs.mutex.RLock()
    defer vs.mutex.RUnlock()
    
    // 计算最近一分钟的访问量
    var lastMinuteVisits int64
    now := time.Now().Unix()
    for timestamp, count := range vs.lastMinute {
        if now-timestamp <= 60 {
            lastMinuteVisits += count
        }
    }
    
    return vs.totalVisits, len(vs.uniqueIPs), lastMinuteVisits
}

func main() {
    stats := NewVisitStats()
    
    // 处理访问请求
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        ip := r.RemoteAddr
        stats.recordVisit(ip)
        fmt.Fprintf(w, "Welcome!")
    })
    
    // 定期打印统计信息
    go func() {
        for {
            total, unique, lastMin := stats.getStats()
            fmt.Printf("总访问量: %d, 唯一IP数: %d, 最近一分钟访问量: %d\n",
                total, unique, lastMin)
            time.Sleep(5 * time.Second)
        }
    }()
    
    http.ListenAndServe(":8080", nil)
}

关键点解析

  1. 使用读写锁sync.RWMutex提高并发性能
  2. 通过map记录唯一IP和时间戳数据
  3. 实现了滑动窗口统计最近一分钟的访问量
  4. 定期清理过期数据

实战案例:分布式任务调度系统

问题描述

实现一个支持高并发的分布式任务调度系统,具备任务分发、执行和监控功能。

实现代码

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 任务定义
type Task struct {
    ID       string
    Payload  interface{}
    Priority int
}

// 工作节点
type Worker struct {
    ID     string
    Status string
    Tasks  chan Task
}

// 调度器
type Scheduler struct {
    workers    map[string]*Worker
    taskQueue  chan Task
    workerPool chan *Worker
    mutex      sync.RWMutex
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewScheduler(workerCount int) *Scheduler {
    ctx, cancel := context.WithCancel(context.Background())
    s := &Scheduler{
        workers:    make(map[string]*Worker),
        taskQueue:  make(chan Task, 1000),
        workerPool: make(chan *Worker, workerCount),
        ctx:        ctx,
        cancel:     cancel,
    }
    
    // 初始化工作节点
    for i := 0; i < workerCount; i++ {
        worker := &Worker{
            ID:     fmt.Sprintf("worker-%d", i),
            Status: "idle",
            Tasks:  make(chan Task, 10),
        }
        s.workers[worker.ID] = worker
        s.workerPool <- worker
    }
    
    return s
}

func (s *Scheduler) Start() {
    // 任务分发
    go func() {
        for {
            select {
            case <-s.ctx.Done():
                return
            case task := <-s.taskQueue:
                worker := <-s.workerPool
                s.assignTask(worker, task)
            }
        }
    }()
    
    // 监控工作节点状态
    go s.monitorWorkers()
}

func (s *Scheduler) assignTask(worker *Worker, task Task) {
    s.mutex.Lock()
    worker.Status = "busy"
    s.mutex.Unlock()
    
    go func() {
        worker.Tasks <- task
        // 模拟任务执行
        time.Sleep(time.Second * time.Duration(task.Priority))
        
        s.mutex.Lock()
        worker.Status = "idle"
        s.mutex.Unlock()
        
        s.workerPool <- worker
    }()
}

func (s *Scheduler) monitorWorkers() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-s.ctx.Done():
            return
        case <-ticker.C:
            s.mutex.RLock()
            for id, worker := range s.workers {
                fmt.Printf("Worker %s status: %s\n", id, worker.Status)
            }
            s.mutex.RUnlock()
        }
    }
}

func main() {
    scheduler := NewScheduler(5)
    scheduler.Start()
    
    // 模拟提交任务
    go func() {
        for i := 0; i < 20; i++ {
            task := Task{
                ID:       fmt.Sprintf("task-%d", i),
                Payload:  fmt.Sprintf("payload-%d", i),
                Priority: i % 3 + 1,
            }
            scheduler.taskQueue <- task
            time.Sleep(time.Millisecond * 500)
        }
    }()
    
    // 运行一段时间后退出
    time.Sleep(time.Second * 30)
    scheduler.cancel()
}

关键点解析

  1. 使用context管理goroutine生命周期
  2. 实现了工作池模式提高资源利用率
  3. 使用channel实现任务队列和工作节点池
  4. 采用读写锁保护共享资源
  5. 实现了基本的监控功能
  6. 支持任务优先级

总结

通过这三个案例,我们循序渐进地展示了Go语言在并发编程中的应用:

  1. 基础案例展示了goroutine和WaitGroup的基本用法
  2. 进阶案例引入了更复杂的并发控制和数据结构
  3. 实战案例整合了多个并发特性,实现了一个完整的系统

在实际开发中,需要注意:

  • 正确使用锁机制避免竞态条件
  • 合理设计channel缓冲区大小
  • 注意goroutine的生命周期管理
  • 实现适当的错误处理和资源清理
  • 考虑系统的可扩展性和维护性

http://www.kler.cn/a/457770.html

相关文章:

  • 每天40分玩转Django:Django Celery
  • 硬件-射频-PCB-常见天线分类-ESP32实例
  • SweetAlert2 - 漂亮可定制的 JavaScript 弹窗
  • 多文件比对
  • pip下载包出现SSLError
  • 数据库入门级SQL优化
  • 【LeetCode】547、省份数量
  • springboot496基于java手机销售网站设计和实现(论文+源码)_kaic
  • 基于 Python 大数据的电脑硬件推荐系统研究
  • ChatGPT是如何生成长文的
  • 微服务篇-深入了解 XA 模式与 AT 模式、分布式事务(部署 TC 服务、微服务集成 Seata )
  • ACDC中AC前级EMS【EMC】
  • 自学记录HarmonyOS Next DRM API 13:构建安全的数字内容保护系统
  • RealityCapture导入视频序列失败
  • 【视觉SLAM:二、三维空间刚体运动】
  • Java 堆排序原理 图文详解 代码逻辑
  • 『VUE』vue-quill-editor设置内容不可编辑(详细图文注释)
  • 一份关于 Ubuntu 系统下代理配置的故障排查笔记
  • C# OpenCvSharp DNN 卡证检测矫正
  • brupsuite的基础用法常用模块(1)
  • .net core 的数据类型
  • 【探花交友】用户登录总结
  • 输入输出(I/O):熟悉 Java 的 I/O 类库,尤其是 NIO 和文件操作
  • LVGL——基础对象篇
  • SpringCloudAlibaba实战入门之路由网关Gateway初体验(十一)
  • YOLOv8模型改进 第二十五讲 添加基于卷积调制(Convolution based Attention) 替换自注意力机制