【Go | 从0实现简单分布式缓存】-4:使用singleflight防止缓存击穿
本文目录
- 一、缓存问题
- 二、singleflight
- 三、geecache.go
一、缓存问题
先回顾一下缓存问题:
缓存雪崩
:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。
缓存击穿
:一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时DB请求量大、压力骤增。
缓存穿透
:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。
二、singleflight
本质上来说,就是其核心目的是实现一个去重机制,用于避免对相同请求的重复处理。这种机制在并发编程中非常有用,尤其是在需要减少重复计算或避免对后端服务(如数据库或远程API)的重复调用时。
package singleflight
import "sync"
type call struct { //
wg sync.WaitGroup
val interface{}
err error
}
type Group struct {
mu sync.Mutex // protects m
m map[string]*call
}
call
代表正在进行中,或已经结束的请求。使用 sync.WaitGroup 锁避免重入。
wg sync.WaitGroup
:一个等待组,用于同步多个并发请求。当请求处理完成时,它会通知所有等待的协程。
val interface{}
:用于存储请求的返回值。interface{} 表示它可以是任意类型。
Group
是 singleflight 的主数据结构,管理不同 key 的请求(call),也就是用于管理多个并发请求。
mu sync.Mutex
:一个互斥锁,用于保护 m 的并发访问。
m map[string]*call
:一个映射,键是请求的标识符(通常是字符串,也就是key),值是指向 call 结构体的指针。这个映射用于存储正在进行的请求,以便去重。
接下来是主函数Do
,第一个参数是对应的key,第二个参数是回调函数方法,也就是无论Do被调用多少次,函数fn
只会被调用一次,然后等待fn调用结束了,就会返回对应的返回值。
也就是确保对于相同的键(key),即使有多个协程同时请求,也只执行一次计算逻辑(由 fn 提供),并共享结果。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
g.mu.Lock()
:锁定互斥锁,确保对 g.m 的访问是线程安全的,也就是延迟初始化,提高内存使用效率。(这是一种设计模式,也是惰性初始化,可以显著减少程序的启动时间,并且如果某些对象或资源在整个程序运行期间从未被使用,那么延迟初始化可以避免不必要的资源分配。)
g.m
是当其调用的时候,再去做初始化,提升了效率。
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
if c, ok := g.m[key]; ok
:检查是否已经有一个相同的请求在处理中,如果 ok 为 true,说明已经有一个请求在处理相同的 key。
g.mu.Unlock()
:释放锁,因为后续操作不需要再修改 g.m。
c.wg.Wait()
:等待正在进行的请求完成。c.wg
是一个 sync.WaitGroup
,用于同步多个协程。等待完成之后,直接返回已经完成的请求的结果即可。
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c := new(call)
:创建一个新的 call 结构体实例,用于存储当前请求的结果。
c.wg.Add(1)
:调用 WaitGroup 的 Add 方法,表示有一个任务正在处理。
g.m[key] = c
:将当前请求存储到映射中,以便其他协程可以等待它的结果。
g.mu.Unlock()
:释放锁,允许其他协程访问 g.m。
c.val, c.err = fn()
c.wg.Done()
c.val, c.err = fn():调用传入的函数 fn,执行实际的计算逻辑,并将结果存储到 c.val 和 c.err 中。
c.wg.Done()
:调用 WaitGroup 的 Done 方法,表示当前任务已经完成。这将通知所有等待的协程。
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
g.mu.Lock()
:再次锁定互斥锁,确保对 g.m 的访问是线程安全的。
delete(g.m, key)
:从映射中删除当前请求的条目。这一步是为了避免内存泄漏,因为请求已经完成,不再需要存储。(比如8003请求8001
,也就是当这一时刻的请求完成之后,比如说请求了,但是下一时刻8001数据变了,但是8003还是请求的原来的自己的存储的旧数据,也就是缓存在8003的数据。)
我们只是针对10w个请求在1s内的请求情况,而不是说10w个请求在1天内分散去取,如果是后者,那么还是大家轮流去取对应数据即可,这个Do只是为了防止在极短的时间内,造成这个结果。
在上边我们用了wg.WaitGroup
,并发协程之间不需要消息传递,非常适合 sync.WaitGroup。
wg.Add(1) 锁加1。
wg.Wait() 阻塞,直到锁被释放。
wg.Done() 锁减1。
三、geecache.go
定义好singleflight
之后,就可以在主流程中添加对应的代码了。
首先在Group结构体中添加。
然后在NewGroup
的时候需要加载对应的初始化函数。
最开始我们的load()
函数中是直接获取peer,如下图所示。
所以现在需要加上loader的Do()函数做控制了。
{ }
中的代码是一个 匿名函数,作为 Do 方法的第二个参数传递。
这个匿名函数的签名是 func() (interface{}, error)
,与 singleflight.Do 方法的参数类型一致。
也就是下面{}
圈起来的部分,是匿名函数,也就是作为Do的回调函数。
然后返回对应的viewvi
的值。
把光标放在func()
上面,就可以看到对应的函数了。
这里有点绕,需要好好体会下。
当第一个请求调用 Do 时,singleflight.Group 会检查是否已经有一个相同的 key 在处理中。如果没有,Do 会执行匿名函数来加载数据,并将结果缓存起来。
如果有,后续的并发请求会等待第一个请求的结果,而不是重复执行匿名函数。
总的来说就是,在 load 方法中,Do 被调用时传入了一个匿名函数,这个匿名函数定义了如何加载数据。匿名函数的作用是尝试从远程节点加载数据(如果配置了远程节点),如果失败,则从本地加载。这种设计允许 Do 方法内部根据键的唯一性来控制加载逻辑的执行,避免重复加载。
回顾一下匿名函数的作用:在 Go 中,函数是一等公民,可以作为变量传递。匿名函数是一种没有名字的函数,可以直接在代码中定义并传递。在 Do 方法中,匿名函数被用作参数,定义了如何加载数据。这种设计允许 Do 方法内部根据键的唯一性来控制加载逻辑的执行,避免重复加载。匿名函数的作用是封装具体的加载逻辑,而 Do 方法的作用是确保这个逻辑只执行一次。通过将匿名函数传递给 Do,Do 方法可以在内部根据键的唯一性来控制加载逻辑的执行,避免重复加载。这种设计不仅提高了性能,还减少了资源浪费。