当前位置: 首页 > article >正文

Go语言扩展包x/sync使用指南

singleflight

singleflight 是一个用于防止重复函数调用的机制,确保对于同一个键(key),在同一时间内只有一个函数执行,其他请求会等待该函数执行完成并共享结果。这可以大量减少对比如访问数据库操作次数,减轻数据库压力提高性能。

方法

  • Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
    执行一个函数 fn,如果当前有其他 goroutine 正在为相同的 key 执行该函数,则当前 goroutine 会等待其完成,并共享其结果。

    • 参数:
      • key:用于标识请求的唯一键,相同的 key只会调用一次,并共享结果。
      • fn: 需要要执行的函数,返回一个结果和一个错误
    • 返回值
      • v:函数 fn 的返回结果。
      • err:函数 fn 执行过程中产生的错误。
      • shared:表示结果是否是共享的,如果为 true 则表示结果是从其他正在执行相同 key 请求的 goroutine 处共享得到的。
  • DoChan(key string, fn func() (interface{}, error)) <-chan Result
    与Do方法类似,但是返回channel

  • Forget(key string)
    从请求组中移除指定 key 的记录,之后再次对该 key 发起请求时,会重新执行函数 fn

示例

package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "time"
)

var g singleflight.Group

// 模拟一个耗时的请求操作
func getData(key string) (interface{}, error) {
    fmt.Printf("Fetching data for key %s\n", key)
    time.Sleep(2 * time.Second) // 模拟耗时操作
    return "Data for " + key, nil
}

func main() {
    // 模拟多个并发请求相同的 key
    for i := 0; i < 5; i++ {
        go func() {
            data, err, shared := g.Do("key1", func() (interface{}, error) {
                return getData("key1")
            })
            if err != nil {
                fmt.Printf("Error: %v\n", err)
            } else {
                if shared {
                    fmt.Printf("Result for key1 was shared. Data: %s\n", data.(string))
                } else {
                    fmt.Printf("Result for key1 was fetched. Data: %s\n", data.(string))
                }
            }
        }()
    }

    // 等待所有 goroutine 完成
    time.Sleep(3 * time.Second)
}

errgroup

errgroup 用于并发执行多个任务,并在其中一个任务出错时快速返回错误,同时等待所有任务完成。

结构

type Group struct {
  // 取消函数,借助 context.WithCancel 或者 context.WithTimeout 等函数创建上下文时生成。当 Group 里的某个任务返回错误时,cancel 函数会被调用
	cancel func(error)
  // go 同步原语
	wg sync.WaitGroup
  // sem 是一个通道,用于控制并发任务的数量。
	sem chan token
  // 确保第一个出错的任务会被记录到err字段中
	errOnce sync.Once
  // 保存第一个出错信息
	err     error
}

方法

  • WithContext(ctx context.Context) (*Group, context.Context):返回一个 *Group 指针和一个新的上下文。新的上下文会在 Group 中的某个任务返回错误时被取消。
  • (g *Group) Wait() error :会阻塞直到所有goroutine方法完成,如果有其中一个goroutine返回error,Wait会返回该error。如果所有 goroutine 都成功完成,Wait 方法返回 nil
  • (g *Group) Go(f func() error) :用于启动一个新的 goroutine 来执行传入的函数。如果该函数返回非 nil 错误,Group 的 Wait 方法会立即返回该错误。
  • (g *Group) TryGo(f func() error) bool 与 Go 方法不同的是,TryGo 不会阻塞,如果当前没有可用的并发资源(受到 SetLimit 设置的限制),它会立即返回 false,而不是等待资源可用。下面为你详细介绍其用法:
    基本原型
  • (g *Group) SetLimit(n int) SetLimit 方法用于设置并发执行任务的最大数量,避免因同时运行过多的 goroutine 而耗尽系统资源

代码示例

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "net/http"
    "time"
)

// fetchURL 函数用于发起 HTTP 请求,并根据上下文进行取消控制
// 参数 ctx 是上下文,用于控制请求的生命周期
// 参数 url 是要请求的目标 URL
// 返回值是可能出现的错误
func fetchURL(ctx context.Context, url string) error {
    // 创建一个带有上下文的 HTTP 请求
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return err
    }

    // 创建一个 HTTP 客户端,并设置超时时间为 3 秒
    client := &http.Client{Timeout: 3 * time.Second}

    // 发起 HTTP 请求
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    // 确保响应体在使用完后关闭,避免资源泄漏
    defer resp.Body.Close()

    // 打印请求的 URL 以及响应的状态码
    fmt.Printf("Fetched %s, status: %s\n", url, resp.Status)
    return nil
}

func main() {
    // 创建一个带有 5 秒超时的上下文
    // context.Background() 返回一个空的上下文,作为基础上下文
    // context.WithTimeout 基于基础上下文创建一个带有 5 秒超时的新上下文
    // cancel 是一个取消函数,用于手动取消上下文
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    // 确保在 main 函数结束时调用 cancel 函数,避免资源泄漏
    defer cancel()

    // 使用 WithContext 创建一个 errgroup.Group 实例和一个新的上下文
    // 新的上下文会在 errgroup 中的某个任务返回错误时被取消
    g, ctx := errgroup.WithContext(ctx)

    // 定义要请求的 URL 列表
    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.nonexistentwebsite.com", // 这个 URL 可能会请求失败
    }

    // 遍历 URL 列表,为每个 URL 启动一个 goroutine 进行请求
    for _, url := range urls {
        // 为了避免闭包问题,创建一个局部变量
        url := url
        // 使用 Go 方法启动一个新的 goroutine 来执行 fetchURL 函数
        // 每个 goroutine 都会并发执行 fetchURL 函数
        g.Go(func() error {
            return fetchURL(ctx, url)
        })
    }

    // 等待所有 goroutine 完成
    // Wait 方法会阻塞,直到所有通过 Go 方法启动的 goroutine 完成
    // 如果其中有任何一个 goroutine 返回错误,Wait 方法会立即返回该错误
    if err := g.Wait(); err != nil {
        fmt.Printf("Encountered an error: %v\n", err)
    } else {
        fmt.Println("All requests completed successfully.")
    }
}

semaphore

信号量(Semaphore),用于控制对有限资源的并发访问。信号量维护了一组许可,线程(goroutine)在访问资源前需要先获取许可,使用完资源后释放许可。

结构

type waiter struct {
	n     int64           //请求的资源数量
	ready chan<- struct{} // 当资源可用时关闭该通道,通知等待着
}

type Weighted struct {
	size    int64         //最大资源数量
	cur     int64         //当前已使用的资源数量
	mu      sync.Mutex    //互斥锁,保护共享资源
	waiters list.List     //等待者队列,存储等待获取资源的 waiter
}

方法

  • NewWeighted(n int64) *Weighted 创建一个运行并发数为n的信号量
  • (s *Weighted) Acquire(ctx context.Context, n int64) 申请n个信号量,如果没有足够的许可,程序会阻塞直到获取到足够的许可或者ctx 被取消或者超时。如果成功返回nil,失败返回err
  • (s *Weighted) TryAcquire(n int64) bool 尝试立即获取 n 个权重的许可,如果有足够的许可则获取并返回 true,否则返回 false。
  • (s *Weighted) Release(n int64) 释放n个许可

代码示例

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "time"
)

func main() {
    // 创建一个信号量,限制同时可以访问共享资源的 goroutine 数量为 2
    sem := semaphore.NewWeighted(2)

    // 创建一个上下文,用于控制并发任务的生命周期
    ctx := context.Background()

    // 启动 5 个 goroutine 来并发地访问共享资源
    for i := 0; i < 5; i++ {
        i := i // 捕获循环变量
        go func() {
            // 获取信号量
            if err := sem.Acquire(ctx, 1); err != nil {
                fmt.Printf("Goroutine %d failed to acquire semaphore: %v\n", i, err)
                return
            }
            defer sem.Release(1) // 确保在函数退出时释放信号量

            // 模拟访问共享资源
            fmt.Printf("Goroutine %d is accessing the shared resource.\n", i)
            time.Sleep(time.Second)
            fmt.Printf("Goroutine %d has finished accessing the shared resource.\n", i)
        }()
    }

    // 等待所有 goroutine 完成
    time.Sleep(6 * time.Second)
}


http://www.kler.cn/a/546532.html

相关文章:

  • 详解Cookie和Session
  • Windchill 成套的解决方案
  • DeepSeek接入网络安全领域,AI高效驱动,重新定义网络防御边界!
  • Vim 退出编辑模式
  • 2月14日情人节,致挚爱
  • Linux进阶——selinux
  • 【Linux网络编程】华为云开放端口号
  • Django ORM:外键字段的命名与查询机制解析
  • 推荐的、好用的线性稳压器
  • SQL联合查询
  • CRMEB 多商户版v3.0.1源码全开源+PC端+Uniapp前端+搭建教程
  • VoIP之音视频会议中的混音技术
  • 2025-2-14算法打卡
  • Java--IO流详解(中)--字节流
  • C++ Primer 函数基础
  • 网络编程(tcp线程池)
  • Baumer工业相机堡盟工业相机如何通过BGAPI SDK实现一次触发控制三个光源开关分别采集三张图像(C#)
  • 信息安全管理(3):网络安全
  • TCP可靠传输的ARQ协议
  • C++ 中的继承与派生