7天用Go从零实现分布式缓存GeeCache(学习)(2)
参考:https://geektutu.com/post/geecache-day2.html
// Cache 是一个 LRU 缓存(最近最少使用缓存),它不是并发安全的。
type Cache struct {
maxBytes int64 // 缓存的最大字节数
nbytes int64 // 当前缓存使用的字节数
ll *list.List // 双向链表,用于记录访问顺序
cache map[string]*list.Element // 键值映射,快速查找缓存项
OnEvicted func(key string, value Value) // 当缓存条目被移除时的回调函数,可选
}
// entry 是存储在链表中的缓存条目
type entry struct {
key string // 缓存条目的键
value Value // 缓存条目的值,必须实现 Value 接口
}
// Value 接口定义了缓存条目需要实现的 Len 方法,用于获取条目的字节大小
type Value interface {
Len() int
}
// cache 是一个封装了 LRU 缓存的结构体
type cache struct {
mu sync.Mutex // 互斥锁,用于保证并发安全
// 组合,类似于继承
lru *lru.Cache // LRU 缓存实例
cacheBytes int64 // 缓存的最大容量
}
// Group 是缓存的命名空间,包含缓存数据和数据加载方法
type Group struct {
name string // 缓存的名称
getter Getter // 获取数据的回调接口
mainCache cache // 核心缓存
}
1.单机并发缓存前置条件
1.1 线程安全
- 在并发场景中,多个 goroutine 可能会同时访问和修改缓存,因此需要确保缓存操作的线程安全。
- 可以使用同步机制(如
sync.Mutex
或sync.RWMutex
)来保护共享数据,以避免数据竞争。 - 对于读取频繁、写入较少的缓存,使用读写锁 (
sync.RWMutex
) 可以提高性能,因为它允许多个读取操作同时进行,但写入时会锁定所有操作。
1.2 数据一致性
- 缓存系统需要保证数据的一致性,即数据在多次读取时应保持稳定。
- 如果缓存需要在多个 goroutine 之间共享数据,必须确保在写入操作完成之前不会被其他操作读取,以免读取到不完整或过期的数据。
- 缓存失效或更新策略需要在多线程环境中正确应用,以防止不一致的数据影响系统的正常功能。
1.3 缓存策略
- 常见的缓存策略(如 LRU、LFU 等)在单机并发环境中也需要进行线程安全的实现。例如在 LRU 缓存中,节点的访问会更新其在缓存中的位置,这些操作需要考虑并发情况。
- 缓存的清理策略(如定期清理过期数据)也需要处理并发,确保不会在清理过程中影响正常的读写操作。
1.4 高效的并发支持
- 使用高效的数据结构和算法是并发缓存的关键。常见的实现包括:
- 使用
sync.Map
实现并发读写缓存,以避免频繁加锁开销。 - 将缓存的读写分离,或使用分片(sharding)技术,以减少锁的粒度,从而提高并发性能。
- 使用
- 结合缓存的特性,合理地设计缓存的大小和生命周期,以便在高并发下提供更好的性能。
1.5 避免缓存穿透和击穿
- 缓存穿透和缓存击穿会在并发场景中放大问题,使得系统资源耗尽或崩溃。
- 可以通过双重检查锁机制(
Double-Check Locking
)在并发访问缓存时,避免多个线程同时请求同一资源。 - 对于穿透(请求缓存中不存在的数据),可以在缓存中添加空值占位;对于击穿(热门数据失效),可设计“过期重置”或“请求排队”机制来保护缓存。
1.6 有效的错误处理和重试机制
- 在并发环境下,可能会出现资源访问失败、锁竞争失败等情况,因此需要设置合理的错误处理和重试策略,保证缓存操作的可靠性和稳定性。
2. 并发写操作
- 抽象一个只读数据结构ByteView用来表示缓存值,并使用互斥锁分装LRU方法
type ByteView struct{
b []byte
}
func (v ByteView) Len() int{
return len(v.b)
}
func (v ByteView) ByteSlice() []byte{
return cloneBytes(v.b)
}
func (v ByteView) String() string{
return string(v.b)
}
func cloneBytes(b []byte) []byte{
c := make([]byte,len(b))
copy(c,b)
return c
}
- ByteView 只有一个数据成员,
b []byte
,b 将会存储真实的缓存值。选择 byte 类型是为了能够支持任意的数据类型的存储,例如字符串、图片等。 - 实现
Len() int
方法,我们在 lru.Cache 的实现中,要求被缓存对象必须实现 Value 接口,即Len() int
方法,返回其所占的内存大小。 b
是只读的,使用ByteSlice()
方法返回一个拷贝,防止缓存值被外部程序修改。
ByteView
实现了 Len()
方法,因此它满足 Value
接口,可以作为 Cache
中的 entry.value
使用。不过,ByteView
不能直接调用 Cache
的成员,因为 ByteView
和 Cache
是两个独立的类型,只有 Cache
结构体内部才有权限访问其成员变量。
1. ByteView
和 Cache
的关系
Cache
使用了Value
接口,并要求每个缓存的entry.value
实现Len()
方法。ByteView
实现了Len()
方法,因此符合Value
接口要求,可以作为Cache
中的缓存值。- 当
Cache
的onEvicted
回调函数需要触发时,或在计算缓存总字节数时,Cache
可以调用Value.Len()
来获取值的大小。 - 这意味着,
ByteView
可以作为Cache
的一个缓存值,但是它无法直接访问或调用Cache
的成员。
2. ByteView
如何被 Cache
使用
假设我们在 Cache
中插入一个 ByteView
实例作为缓存项的值。Cache
可以通过调用 entry.value.Len()
来获得缓存项的大小,而 ByteView.Len()
返回 ByteView
数据的字节数。
例如:
package main
import (
"fmt"
"geecache/lru"
)
func main() {
// 假设 ByteView 是实现了 Len() 的结构体
view := ByteView{b: []byte("Hello, World!")}
// 创建一个 Cache 实例
cache := lru.Cache{
mxBytes: 1024,
cache: make(map[string]*list.Element),
}
// 假设有某种方式添加 ByteView 到 cache
cache.Add("greeting", view)
}
在 Cache.Add
方法中,可以如下调用 Len()
方法来检查当前缓存大小是否超过 mxBytes
:
func (c *Cache) Add(key string, value Value) {
// 将值包装为 entry 并插入
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.nbytes += int64(len(key)) + int64(value.Len()) // 这里调用 value.Len()
}
3. 访问权限的限制
- 由于
ByteView
和Cache
是独立的类型,ByteView
不能直接调用或访问Cache
的成员。 Cache
可以使用Value
接口,但只能通过接口方法来操作存储在其中的ByteView
实例。
为lru.Cache添加并发特性
package geecache
import (
"geecache/lru"
"sync"
)
type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64
}
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}
c.lru.Add(key, value)
}
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}
return
}
cache.go
的实现非常简单,实例化 lru,封装 get 和 add 方法,并添加互斥锁 mu。- 在
add
方法中,判断了c.lru
是否为 nil,如果等于 nil 再创建实例。这种方法称之为延迟初始化(Lazy Initialization),一个对象的延迟初始化意味着该对象的创建将会延迟至第一次使用该对象时。主要用于提高性能,并减少程序内存要求。
主体结构Group
flowchart TD
A[接收 key] --> B{检查是否被缓存}
B -->|是| C[返回缓存值 ⑴]
B -->|否| D{是否应当从远程节点获取}
D -->|是| E[与远程节点交互] --> F[返回缓存值 ⑵]
D -->|否| G[调用`回调函数`] --> H[获取值并添加到缓存] --> I[返回缓存值 ⑶]
回调函数Getter
我们思考一下,如果缓存不存在,应从数据源(文件,数据库等)获取数据并添加到缓存中。GeeCache 是否应该支持多种数据源的配置呢?不应该,一是数据源的种类太多,没办法一一实现;二是扩展性不好。如何从源头获取数据,应该是用户决定的事情,我们就把这件事交给用户好了。因此,我们设计了一个回调函数(callback),在缓存不存在时,调用这个函数,得到源数据。
type Getter interface{
Get(key string) ([]bytr,error)
}
type GetterFunc func(key string) ([]byte,error)
func (f GetterFunc) Get(key string) ([]byte,error){
return f(key)
}
- 定义接口 Getter 和 回调函数
Get(key string)([]byte, error)
,参数是 key,返回值是 []byte。 - 定义函数类型 GetterFunc,并实现 Getter 接口的
Get
方法。 - 函数类型实现某一个接口,称之为接口型函数,方便使用者在调用时既能够传入函数作为参数,也能够传入实现了该接口的结构体作为参数。
[!go接口技巧]
定义一个函数类型 F,并且实现接口 A 的方法,然后在这个方法中调用自己。这是 Go 语言中将其他函数(参数返回值定义与 F 一致)转换为接口 A 的常用技巧。
Group的定义
Group
在这个缓存系统中的核心意义是管理特定类别的缓存数据,每个 Group
代表一个逻辑上的缓存分组。它负责提供数据的获取逻辑,包括缓存查询和缓存未命中时的数据加载。通过分组的概念,可以更细粒度地控制缓存的逻辑和数据范围,实现高效的数据存储和读取。以下是 Group
的核心意义和作用:
1. 分组缓存数据,提供命名空间
- 每个
Group
都有一个唯一的name
,相当于一个命名空间。通过name
,可以在同一个应用中创建多个缓存组,每个组管理各自的数据。 - 这样设计可以有效地将不同类别的数据分开管理,防止数据冲突,并能在不同业务逻辑中有针对性地管理缓存内容。
2. 提供统一的数据获取逻辑
Group
封装了从缓存中读取数据的逻辑,以及在缓存未命中时加载数据的逻辑。通过getter
回调接口,Group
可以在缓存不存在目标数据时,直接从数据源(如数据库或文件)加载数据。- 这种设计使得数据获取过程对调用者透明,即使数据未缓存,也会通过
getter
加载数据后返回给调用者。
3. 缓存数据的集中管理
Group
通过mainCache
存储和管理实际的缓存数据,并维护缓存容量(cacheBytes
)的控制。- 可以根据业务需求,为每个
Group
设置不同的缓存容量大小和管理策略(如不同的过期或清理策略),实现灵活的缓存控制。
// cache 是一个封装了 LRU 缓存的结构体
type cache struct {
mu sync.Mutex // 互斥锁,用于保证并发安全
// 组合,类似于继承
lru *lru.Cache // LRU 缓存实例
cacheBytes int64 // 缓存的最大容量
}
type Group struct {
name string // 缓存组的名称,标识该缓存组的唯一性
getter Getter // 缓存未命中时,从数据源获取数据的回调接口
mainCache cache // 缓存存储的主要数据结构
}
// 定义一个全局变量区域,用于管理多个 Group 实例
var (
mu sync.RWMutex // 读写锁,确保在并发环境下对 groups 的安全访问
groups = make(map[string]*Group) // 存储所有的缓存组,以名称为键
)
// NewGroup 函数用于创建一个新的缓存组
// name:缓存组的名称
// cacheBytes:缓存的最大容量,以字节为单位
// getter:缓存未命中时用于加载数据的 Getter 接口实例
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter") // 如果 getter 为空,则触发 panic,因为获取数据的回调接口不可为空
}
mu.Lock() // 获取写锁,以防止其他并发操作对 groups 进行修改
defer mu.Unlock() // 在函数退出时释放锁,确保锁的释放
// 创建新的缓存组
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes}, // 初始化缓存的最大容量
}
groups[name] = g // 将新的缓存组添加到全局的 groups 映射中
return g // 返回创建的缓存组
}
代码说明:
-
Group
结构体:Group
结构体表示一个缓存组,包含缓存组的名称、未命中时的回调接口getter
、和主要的缓存数据结构mainCache
。getter
是一个Getter
类型的接口,在缓存未命中时负责从外部数据源获取数据。
-
全局变量:
mu sync.RWMutex
:读写锁,确保在并发环境下安全访问groups
,以防止并发写入或读取冲突。groups
是一个全局的map
,以缓存组名称为键,用于存储和管理多个缓存组的实例。
-
NewGroup
函数:NewGroup
是一个构造函数,用于创建和注册新的Group
实例。- 参数
name
用于标识缓存组的名称,cacheBytes
指定缓存容量大小,getter
用于在缓存未命中时加载数据。 - 如果
getter
参数为空,会触发panic
,因为缓存未命中时需要getter
来加载数据。 - 加锁 (
mu.Lock()
) 确保groups
的安全修改,在退出函数时释放锁(defer mu.Unlock()
)。 - 创建新的
Group
实例后,将它存储在全局groups
映射中,并返回创建的缓存组实例。
- 一个 Group 可以认为是一个缓存的命名空间,每个 Group 拥有一个唯一的名称
name
。比如可以创建三个 Group,缓存学生的成绩命名为 scores,缓存学生信息的命名为 info,缓存学生课程的命名为 courses。 - 第二个属性是
getter Getter
,即缓存未命中时获取源数据的回调(callback)。 - 第三个属性是
mainCache cache
,即一开始实现的并发缓存。 - 构建函数
NewGroup
用来实例化 Group,并且将 group 存储在全局变量groups
中。 GetGroup
用来特定名称的 Group,这里使用了只读锁RLock()
,因为不涉及任何冲突变量的写操作。
Group的Get方法
func (g *Group) Get(key string) (ByteView, error){
if key == ""{
return ByteView{},fmt.Errorf("key is required")
}
if v,ok := g.mainCache.get(key);ok{
log.Println("[GeeCache] hit")
return v, nil
}
return g.load(key)
}
func (g *Group) load(key string) (value ByteView, err error){
return g.getLocally(key)
}
func (g *Group) getLocally(key string) (ByteView,error){
bytes, err := g.getter.Get(key)
if err != nil{
return ByteView{},err
}
value := ByteView{b: cloneBytes(bytes)}
g.populateCache(key,value)
return value,nil
}
func (g *Cache) populateCache(key string, value ByteView){
g.mainCache.add(key,value)
}
- Get 方法实现了上述所说的流程 ⑴ 和 ⑶。
- 流程 ⑴ :从 mainCache 中查找缓存,如果存在则返回缓存值。
- 流程 ⑶ :缓存不存在,则调用 load 方法,load 调用 getLocally(分布式场景下会调用 getFromPeer 从其他节点获取),getLocally 调用用户回调函数
g.getter.Get()
获取源数据,并且将源数据添加到缓存 mainCache 中(通过 populateCache 方法)
总结
代码注释
package geecache
import (
"fmt"
"log"
"sync"
)
// Group 是缓存的命名空间和相关数据加载的管理器
type Group struct {
name string // 缓存组名称
getter Getter // 数据加载回调,当缓存未命中时使用
mainCache cache // 主缓存,用于存储缓存数据
}
// Getter 接口用于定义获取数据的方法
type Getter interface {
Get(key string) ([]byte, error) // 根据 key 获取数据
}
// GetterFunc 类型实现了 Getter 接口,用于将一个函数转换为 Getter 接口
type GetterFunc func(key string) ([]byte, error)
// Get 方法实现 Getter 接口,调用自身作为函数实现数据获取逻辑
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}
// 全局变量,用于存储并管理所有缓存组实例
var (
mu sync.RWMutex // 读写锁,确保并发安全
groups = make(map[string]*Group) // 缓存组映射表
)
// NewGroup 创建一个新的缓存组实例
// name: 缓存组名称,cacheBytes: 缓存容量,getter: 数据获取接口
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter") // 若 getter 为空,触发 panic
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g // 将新创建的缓存组存储到全局变量中
return g
}
// GetGroup 返回已经创建的缓存组实例,若不存在则返回 nil
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
// Get 从缓存中获取 key 对应的值,若不存在则调用加载方法获取
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}
// 从 mainCache 中获取缓存值
if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit") // 缓存命中
return v, nil
}
// 缓存未命中,调用加载方法获取数据
return g.load(key)
}
// load 从数据源中获取 key 对应的值
func (g *Group) load(key string) (value ByteView, err error) {
return g.getLocally(key) // 本地获取数据
}
// getLocally 使用 getter 从本地数据源加载数据,并添加到缓存
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key) // 使用 getter 获取数据
if err != nil {
return ByteView{}, err
}
value := ByteView{b: cloneBytes(bytes)} // 将结果封装为 ByteView
g.populateCache(key, value) // 将数据添加到缓存
return value, nil
}
// populateCache 将数据添加到 mainCache 中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
Group
中既包含缓存的存储结构 mainCache
,也包含数据未命中时从源加载数据的逻辑。通过 Getter
接口提供数据加载回调,使得 Group
可以在缓存未命中时使用不同的加载逻辑。
核心功能包括:
- 缓存存取:从缓存中获取数据,并在未命中时从数据源加载。
- 数据加载策略:通过
Getter
接口定义了加载策略,允许使用不同的加载函数。 - 缓存管理:对多个缓存组实例进行管理,确保缓存数据隔离。
提炼:关键函数签名和变量
主要结构体与变量
-
type Group struct
:缓存的命名空间管理结构体。name string
:缓存组名称。getter Getter
:数据加载接口,用于缓存未命中时从数据源加载。mainCache cache
:缓存存储结构,主要存储缓存数据。
-
var groups map[string]*Group
:全局缓存组管理,用于存储和获取Group
实例。 -
var mu sync.RWMutex
:用于并发安全地访问groups
。
关键函数签名
-
func NewGroup(name string, cacheBytes int64, getter Getter) *Group
- 创建新的缓存组实例并注册到全局变量中。
-
func GetGroup(name string) *Group
- 根据名称获取缓存组实例,如果不存在则返回
nil
。
- 根据名称获取缓存组实例,如果不存在则返回
-
func (g *Group) Get(key string) (ByteView, error)
- 获取缓存中的数据,若缓存未命中则调用
load
从数据源加载。
- 获取缓存中的数据,若缓存未命中则调用
-
func (g *Group) load(key string) (ByteView, error)
- 从数据源加载数据,调用本地的
getLocally
函数。
- 从数据源加载数据,调用本地的
-
func (g *Group) getLocally(key string) (ByteView, error)
- 使用
getter
从本地数据源加载数据,并将其存储到缓存中。
- 使用
-
func (g *Group) populateCache(key string, value ByteView)
- 将数据添加到缓存中,以便下次访问。
- 将数据添加到缓存中,以便下次访问。