当前位置: 首页 > article >正文

Go singleflight库源码分析

设计思想

singleflight是Go语言中的一个开源库;主要作用的是避免对同一个任务的重复请求。

它的设计思想主要是通过一个map和一个互斥锁来实现对任务请求的管理。

// Group 一个工作单元,可以管理多个singleFlight任务
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

map中的每一个key对应一个call,call中保存了请求的结果和一个channel,用于阻塞和唤醒请求;

当一个新的请求来时,首先会通过map查找是否有相同的请求正在进行,如果有则等待,如果没有则新建一个请求并保存到map中。

// call 任务单次执行时,会提前创建call结构,用于任务执行、阻塞相同key的请求
type call struct {
	wg sync.WaitGroup
	// 任务返回结果,在任务执行完成后写入
	val interface{}
	err error

	// 重复的任务数
	dups  int
    // 异步任务执行结束后,会遍历chans通知结果
	chans []chan<- Result
}

调用流程

当调用singleflight的Do方法时,首先会检查是否有相同key的请求正在进行,如果有则等待,如果没有则新建一个请求。

请求点执行方式有两种,根据调用方式的不同分为 同步、异步请求。

Do:结果保存在call.val中,调用后会阻塞,直到单次任务执行完成;

DoChan:调用后返回一个ch,任务执行完成后将结果通知到ch中。

解决了哪些问题

singleFlight主要解决了对同一个任务的重复执行问题;如

  • 缓存击穿,redis缓存过期,大量请求同时到达服务器,导致数据库访问量增高;
  • 配置加载,并发情况下拉取资源,本地缓存还没有生效,多个请求都会从远程服务获取资源;

通过singleflight,可以保证对同一个任务的请求在同一时间只有一个,避免了一些耗时较长任务的重复执行。

什么场景下不适合使用

虽然singleflight可以有效避免对同一个任务的重复请求,但是在一些场景下可并不适合,如

  • 任务查询的效率很高,而且调用频率不高,那使用singleFlight会带来额外的加锁、解锁开销;
  • 请求要求实时返回,使用singleflight会导致一些请求阻塞,然后集中返回,导致部分请求响应时间长,而且返回的瞬间会有瞬时的高峰。

使用示例

var group = singleflight.Group{}

func GetSceneConfig(name string) *SceneConfig {
    uniqKey := fmt.Sprintf("uniq_key:%s", name)
    group.Do(uniqKey, func() (interface{}, error) {
        return sceneMapper.GetConfig(name)
    })
}

源码展示

// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"

import (
	"bytes"
	"errors"
	"fmt"
	"runtime"
	"runtime/debug"
	"sync"
)

// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")

// panicError 在执行给定任务过程中发生panic异常时,抛出的err类型
type panicError struct {
	value interface{}
	stack []byte
}

// Error implements error interface.
func (p *panicError) Error() string {
	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}

func (p *panicError) Unwrap() error {
	err, ok := p.value.(error)
	if !ok {
		return nil
	}

	return err
}

func newPanicError(v interface{}) error {
	stack := debug.Stack()

	// The first line of the stack trace is of the form "goroutine N [status]:"
	// but by the time the panic reaches Do the goroutine may no longer exist
	// and its status will have changed. Trim out the misleading line.
	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
		stack = stack[line+1:]
	}
	return &panicError{value: v, stack: stack}
}

// call 任务单次执行时,会提前创建call结构,用户任务执行与阻塞
type call struct {
	wg sync.WaitGroup

	// 任务返回结果,在任务执行完成后写入
	val interface{}
	err error

	// 重复的任务数
	dups  int
	chans []chan<- Result
}

// Group 一个工作单元,可以管理多个singleFlight任务
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

// Do singleFlight任务执行入口,在任务没有执行过程中多次重复key的调用会被阻塞等待第一次任务执行完成才返回。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

// DoChan 异步执行单次任务,执行完成后,把结果通过ch的方式返回。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn)

	return ch
}

// doCall 任务在这里真正执行,结果会写入到c.val中,如果[]chan有值,也会依次发送。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// use double-defer to distinguish panic from runtime.Goexit,
	// more details see https://golang.org/cl/134395
	defer func() {
		// the given function invoked runtime.Goexit
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

		g.mu.Lock()
		defer g.mu.Unlock()
		c.wg.Done()
		if g.m[key] == c {
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 {
				go panic(e)
				select {} // Keep this goroutine around so that it will appear in the crash dump.
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
			// Normal return
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		defer func() {
			if !normalReturn {
				// Ideally, we would wait to take a stack trace until we've determined
				// whether this is a panic or a runtime.Goexit.
				//
				// Unfortunately, the only way we can distinguish the two is to see
				// whether the recover stopped the goroutine from terminating, and by
				// the time we know that, the part of the stack trace relevant to the
				// panic has been discarded.
				if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()

		c.val, c.err = fn()
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

// 忽略key对应的之前到达的key,调用Forget之后,下次传入key,对应的任务会重新执行。
func (g *Group) Forget(key string) {
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()
}

http://www.kler.cn/a/459477.html

相关文章:

  • AWS re:Invent 2024 - Dr. Werner Vogels 主题演讲
  • HTML5实现好看的二十四节气网页源码
  • Java - 日志体系_Apache Commons Logging(JCL)日志接口库_桥接Logback 及 源码分析
  • 2024国城杯 Web
  • SpringBoot使用TraceId日志链路追踪
  • typescript中的interface理解
  • 2.阿里云flinkselectdb-jar作业
  • 【React】- 跨域PDF预览、下载(改文件名)、打印
  • Flink如何处理迟到数据?
  • Python毕业设计选题:基于Hadoop 的国产电影数据分析与可视化_django+spider
  • C++ 函数式编程Lambda表达式
  • 磁编码器(Magnetic Encoder)
  • 【每日学点鸿蒙知识】Web嵌套滚动体验、拷贝传递 ArrayBuffer异常问题、ObjectLink 的属性传递、构建读取参数
  • 【高阶数据结构】红黑树封装map、set
  • leetcode hot100 tire前缀树
  • go语言中zero框架项目日志收集与配置
  • 【2024年-7月-6日-开源社区openEuler实践记录】探秘 Qingzhou:开启高效开发与运维新旅程
  • 012-spring的注解开发、bean的属性、IOC实现原理
  • 【服务器】上传文件到服务器并训练深度学习模型下载服务器文件到本地
  • 基于GA遗传优化TCN-LSTM时间卷积神经网络时间序列预测算法matlab仿真
  • EL表达式与JSTL
  • Quo Vadis, Anomaly Detection? LLMs and VLMs in the Spotlight 论文阅读
  • Java基础(三):桌球案例
  • Qt https请求报错SSL handshake failed 解决思路方法
  • AI大模型-提示工程学习笔记0
  • 进程通信(8)读写锁