Go语言高并发实战案例分析
目录
- 基础案例:简单的并发下载器
- 进阶案例:高并发网站访问统计
- 实战案例:分布式任务调度系统
基础案例:简单的并发下载器
问题描述
需要同时下载多个文件,使用并发方式提高下载效率。
实现代码
package main
import (
"fmt"
"io"
"net/http"
"os"
"sync"
)
func downloadFile(url string, filename string, wg *sync.WaitGroup) {
defer wg.Done()
// 创建HTTP请求
resp, err := http.Get(url)
if err != nil {
fmt.Printf("下载 %s 失败: %v\n", filename, err)
return
}
defer resp.Body.Close()
// 创建文件
file, err := os.Create(filename)
if err != nil {
fmt.Printf("创建文件 %s 失败: %v\n", filename, err)
return
}
defer file.Close()
// 写入文件
_, err = io.Copy(file, resp.Body)
if err != nil {
fmt.Printf("写入文件 %s 失败: %v\n", filename, err)
return
}
fmt.Printf("文件 %s 下载完成\n", filename)
}
func main() {
urls := []string{
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
}
var wg sync.WaitGroup
for i, url := range urls {
wg.Add(1)
filename := fmt.Sprintf("file%d.zip", i+1)
go downloadFile(url, filename, &wg)
}
wg.Wait()
fmt.Println("所有文件下载完成")
}
关键点解析
- 使用
sync.WaitGroup
管理并发下载任务 - 每个下载任务在独立的goroutine中执行
- 使用defer确保资源正确释放
- 基本的错误处理机制
进阶案例:高并发网站访问统计
问题描述
需要统计网站的实时访问量,包括总访问次数、独立IP数等指标。
实现代码
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type VisitStats struct {
mutex sync.RWMutex
totalVisits int64
uniqueIPs map[string]bool
lastMinute map[int64]int64 // 按秒记录最近一分钟的访问量
}
func NewVisitStats() *VisitStats {
return &VisitStats{
uniqueIPs: make(map[string]bool),
lastMinute: make(map[int64]int64),
}
}
func (vs *VisitStats) recordVisit(ip string) {
vs.mutex.Lock()
defer vs.mutex.Unlock()
// 更新总访问量
vs.totalVisits++
// 记录唯一IP
vs.uniqueIPs[ip] = true
// 记录当前秒的访问量
now := time.Now().Unix()
vs.lastMinute[now]++
// 清理一分钟前的数据
vs.cleanOldData(now)
}
func (vs *VisitStats) cleanOldData(now int64) {
for timestamp := range vs.lastMinute {
if now-timestamp > 60 {
delete(vs.lastMinute, timestamp)
}
}
}
func (vs *VisitStats) getStats() (int64, int, int64) {
vs.mutex.RLock()
defer vs.mutex.RUnlock()
// 计算最近一分钟的访问量
var lastMinuteVisits int64
now := time.Now().Unix()
for timestamp, count := range vs.lastMinute {
if now-timestamp <= 60 {
lastMinuteVisits += count
}
}
return vs.totalVisits, len(vs.uniqueIPs), lastMinuteVisits
}
func main() {
stats := NewVisitStats()
// 处理访问请求
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
ip := r.RemoteAddr
stats.recordVisit(ip)
fmt.Fprintf(w, "Welcome!")
})
// 定期打印统计信息
go func() {
for {
total, unique, lastMin := stats.getStats()
fmt.Printf("总访问量: %d, 唯一IP数: %d, 最近一分钟访问量: %d\n",
total, unique, lastMin)
time.Sleep(5 * time.Second)
}
}()
http.ListenAndServe(":8080", nil)
}
关键点解析
- 使用读写锁
sync.RWMutex
提高并发性能 - 通过map记录唯一IP和时间戳数据
- 实现了滑动窗口统计最近一分钟的访问量
- 定期清理过期数据
实战案例:分布式任务调度系统
问题描述
实现一个支持高并发的分布式任务调度系统,具备任务分发、执行和监控功能。
实现代码
package main
import (
"context"
"fmt"
"sync"
"time"
)
// 任务定义
type Task struct {
ID string
Payload interface{}
Priority int
}
// 工作节点
type Worker struct {
ID string
Status string
Tasks chan Task
}
// 调度器
type Scheduler struct {
workers map[string]*Worker
taskQueue chan Task
workerPool chan *Worker
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
func NewScheduler(workerCount int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
s := &Scheduler{
workers: make(map[string]*Worker),
taskQueue: make(chan Task, 1000),
workerPool: make(chan *Worker, workerCount),
ctx: ctx,
cancel: cancel,
}
// 初始化工作节点
for i := 0; i < workerCount; i++ {
worker := &Worker{
ID: fmt.Sprintf("worker-%d", i),
Status: "idle",
Tasks: make(chan Task, 10),
}
s.workers[worker.ID] = worker
s.workerPool <- worker
}
return s
}
func (s *Scheduler) Start() {
// 任务分发
go func() {
for {
select {
case <-s.ctx.Done():
return
case task := <-s.taskQueue:
worker := <-s.workerPool
s.assignTask(worker, task)
}
}
}()
// 监控工作节点状态
go s.monitorWorkers()
}
func (s *Scheduler) assignTask(worker *Worker, task Task) {
s.mutex.Lock()
worker.Status = "busy"
s.mutex.Unlock()
go func() {
worker.Tasks <- task
// 模拟任务执行
time.Sleep(time.Second * time.Duration(task.Priority))
s.mutex.Lock()
worker.Status = "idle"
s.mutex.Unlock()
s.workerPool <- worker
}()
}
func (s *Scheduler) monitorWorkers() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.mutex.RLock()
for id, worker := range s.workers {
fmt.Printf("Worker %s status: %s\n", id, worker.Status)
}
s.mutex.RUnlock()
}
}
}
func main() {
scheduler := NewScheduler(5)
scheduler.Start()
// 模拟提交任务
go func() {
for i := 0; i < 20; i++ {
task := Task{
ID: fmt.Sprintf("task-%d", i),
Payload: fmt.Sprintf("payload-%d", i),
Priority: i % 3 + 1,
}
scheduler.taskQueue <- task
time.Sleep(time.Millisecond * 500)
}
}()
// 运行一段时间后退出
time.Sleep(time.Second * 30)
scheduler.cancel()
}
关键点解析
- 使用context管理goroutine生命周期
- 实现了工作池模式提高资源利用率
- 使用channel实现任务队列和工作节点池
- 采用读写锁保护共享资源
- 实现了基本的监控功能
- 支持任务优先级
总结
通过这三个案例,我们循序渐进地展示了Go语言在并发编程中的应用:
- 基础案例展示了goroutine和WaitGroup的基本用法
- 进阶案例引入了更复杂的并发控制和数据结构
- 实战案例整合了多个并发特性,实现了一个完整的系统
在实际开发中,需要注意:
- 正确使用锁机制避免竞态条件
- 合理设计channel缓冲区大小
- 注意goroutine的生命周期管理
- 实现适当的错误处理和资源清理
- 考虑系统的可扩展性和维护性