【Go | 从0实现简单分布式缓存】-6:GeeCache总结
本文目录
- 序
- 1. LRU缓存淘汰策略
- LFU策略
- LRU策略
- LRU算法实现
- 核心数据结构
- 新增节点功能
- 查找功能
- 删除节点
- 返回链表的长度
- 单测
- 2. 单机并发缓存
- 只读缓存ByteView
- 并发特性
- 核心数据结构Group
- 回调函数Getter
- Group定义
- Group的Get方法
- 单测
- 3. HTTP服务端
- serverHTTP方法
- 单测
- 4. 一致性哈希
- 一致性哈希原理
- 数据倾斜
- 实现一致性哈希
- 单测
- 5. 分布式节点
- peers
- HTTP客户端功能
- 选择节点功能
- 主流程实现
- 单测
- 6.防止缓存击穿
- 主流程实现
- 7. Protobuf通信
- 8. 还能怎么优化?
- 1. 缓存淘汰策略优化
- 2. 过期时间机制
- 3. 并发性能优化
- 4. 数据压缩
- 5. 预热机制
- 6. 分布式一致性优化
序
之前跟着极客兔兔完成了分布式缓存GeeCache,现在做一个总结,也方便为了后面进行项目拓展,拓展完成etcd功能、缓存过期功能、LFU、ARC算法等。
GeeCache是一个很好的教程,可以看作是一套完整的小项目,顺带巩固下Go基础语法知识,接口、单测这些等等。
极客兔兔原教程地址:https://geektutu.com/post/geecache-day1.html
1. LRU缓存淘汰策略
LFU策略
最少使用,也就是淘汰缓存中访问频率最低的记录。LFU 认为,如果数据过去被访问多次,那么将来被访问的频率也更高。LFU 的实现需要维护一个按照访问次数排序的队列,每次访问,访问次数加1,队列重新排序,淘汰时选择访问次数最少的即可。LFU 算法的命中率是比较高的,但缺点也非常明显,维护每个记录的访问次数,对内存的消耗是很高的;另外,如果数据的访问模式发生变化,LFU 需要较长的时间去适应,也就是说 LFU 算法受历史数据的影响比较大。例如某个数据历史上访问次数奇高,但在某个时间点之后几乎不再被访问,但因为历史访问次数过高,而迟迟不能被淘汰。
LRU策略
最近最少使用,相对于仅考虑时间因素的 FIFO 和仅考虑访问频率的 LFU,LRU 算法可以认为是相对平衡的一种淘汰算法。LRU 认为,如果数据最近被访问过,那么将来被访问的概率也会更高。LRU 算法的实现非常简单,维护一个队列,如果某条记录被访问了,则移动到队尾,那么队首则是最近最少访问的数据,淘汰该条记录即可。
LRU算法实现
LRU实现的核心很简单,一个是字典map,存储键key和值的映射关系,根据某个键查找对应的值value的复杂度是o(1),在字典中插入一条记录的复杂度也是o(1)。
双向链表是存放值的,访问到某个值的时候,将其移动到队尾的复杂度是o(1),在队尾增加一条记录和删除一条记录的复杂度也是o(1)。
所以我们需要实现一个包含字典和双向链表的结构体类型Cache,方便后续的增删改查操作。
核心数据结构
为了通用性,缓存的值 被设定为 实现了Value接口的任意类型,该接口只包含了一个方法Len() int
,返回这个值所占用的内存大小。
所以接口设定如下:
type Value interface {
Len() int
}
键值对map
设定为双向链表的节点的数据类型,链表中保存每个值对应的key
的好处在于,淘汰队首节点时,需要用key
从字典中删除对应的映射。
这个值value
是Value
类型,也就是上面定义的interface
接口类型。
type entry struct {
key string
value Value
}
然后定义核心类Cache,实现如下。
使用Go标准库实现双向链表list.List
,字典的定义是map[string]*list.Element
,键是字符串,值是双向链表中对应节点的指针。
maxBytes
是允许使用的最大内存,nbytes
是当前已使用的内存,OnEvicted
是某条记录被移除时的回调函数,可以为 nil。
type Cache struct {
maxBytes int64
nbytes int64
ll *list.List
cache map[string]*list.Element
// optional and executed when an entry is purged.
OnEvicted func(key string, value Value)
}
实例化Cache的代码如下。
新增节点功能
如果键存在,就更新对应节点的值,并且更新队列的容量,以及将节点移动到队首。
ele.Value
是 list.Element
结构体中的一个字段,类型为 interface{}
。这意味着它可以存储任何类型的值。ele.Value
存储的是一个 *entry
类型的值。
由于 ele.Value
是 interface{}
类型,它需要通过类型断言来转换为具体的类型。.*entry
是一个类型断言操作,表示将 ele.Value
断言为 *entry
类型。如果 ele.Value
的实际类型不是 *entry
,程序会运行时错误。
可以看到element
结构体元素如下,其中Value是任意类型。
插入新节点的函数,它的作用是在双向链表的头部插入一个新的节点,并返回这个新节点的引用。
// PushFront inserts a new element e with value v at the front of list l and returns e.
func (l *List) PushFront(v any) *Element {
l.lazyInit()
return l.insertValue(v, &l.root)
}
如果不是新增场景,就队首增加新节点, &entry{key, value}
(首先创建一个 entry 类型的实例(分配内存空间),然后获取这个实例的地址,返回一个指向 entry 的指针,也就是返回的指针类型是*entry), 并字典中添加 key 和节点的映射关系。
查找功能
删除节点
删除就是缓存淘汰了,实现的逻辑也比较简单。
c.ll.Back()
取到队尾节点,从链表中删除。delete(c.cache, kv.key)
,从字典中 c.cache 删除该节点的映射关系。更新当前所用的内存 c.nbytes
。
返回链表的长度
因为需要进行单测,所以这里可以写一个函数进行检测。
func (c *Cache) Len() int {
return c.ll.Len()
}
单测
定义一个自定义类型 String,并为其实现了 Len() 在这里插入代码片
方法,使其满足 Value 接口的要求。这是接口的隐式实现机制的典型应用:
String
类型通过实现 Len()
方法,满足了 Value 接口。因此,String 类型的实例可以被传递给 Cache.Add()
方法。
使用New()
创建一个Cache类型的实例,然后通过Add方法来验证。
测试当使用内存超过了设定值时,是否会触发“无用”节点的移除:
测试回调函数。
reflect.DeepEqual
是 Go 语言反射(Reflection)
包中的一个函数,用于比较两个值是否在逻辑上相等。它比普通的 == 操作符更强大,因为它可以处理复杂的数据结构(如切片、结构体、映射等)
首先创建一个空的字符串切片 keys
,用于存储被驱逐的键。然后定义一个回调函数 callback
,它会在条目被驱逐时将键追加到 keys 切片中。
依次添加四个条目。由于缓存的容量限制(10 字节),当新条目加入时,旧条目可能会被驱逐。
2. 单机并发缓存
只读缓存ByteView
使用Mutex封装LRU几个方法,使之支持并发的读写。
抽象一个ByteView
来表示缓存值,b是只读的,使用ByteSlice()
返回一个拷贝,防止缓存值被外部修改。
并发特性
实现并发比较简单,之前我们的lru.go
中的Cache
只是一个实现,现在我们把Cache
与mutex
锁一起封装到一个新的结构体cache
中实现并发即可。
并且封装add、get
方法,添加互斥锁mu即可。
在 add
方法中,判断了 c.lru
是否为 nil,如果等于 nil
再创建实例。这种方法称之为延迟初始化(Lazy Initialization)
,一个对象的延迟初始化意味着该对象的创建将会延迟至第一次使用该对象时。主要用于提高性能,并减少程序内存要求。
核心数据结构Group
Group 是 GeeCache 最核心的数据结构,负责与用户的交互,并且控制缓存值存储和获取的流程。
现在项目结构如下。
geecache/
|--lru/
|--lru.go // lru 缓存淘汰策略
|--byteview.go // 缓存值的抽象与封装
|--cache.go // 并发控制
|--geecache.go // 负责与外部交互,控制缓存存储和获取的主流程
这一节先不考虑与远程节点交互,后面会有专门的段落实现。
回调函数Getter
现在先来看看回调Getter
函数。极客兔兔中写到:如果缓存不存在,应从数据源(文件,数据库等)获取数据并添加到缓存中。GeeCache
是否应该支持多种数据源的配置呢?不应该,一是数据源的种类太多,没办法一一实现;二是扩展性不好。如何从源头获取数据,应该是用户决定的事情,我们就把这件事交给用户好了。因此,我们设计了一个回调函数(callback)
,在缓存不存在时,调用这个函数,得到源数据。
定义接口 Getter
和 回调函数 Get(key string)([]byte, error)
,参数是 key,返回值是 []byte
。
定义函数类型 GetterFunc
,并实现 Getter 接口的 Get 方法。
函数类型实现某一个接口,称之为接口型函数,方便使用者在调用时既能够传入函数作为参数,也能够传入实现了该接口的结构体作为参数。
可以参考文章:探究 Go 接口型函数
对于上面这个回调函数,我们可以写一个简单的测试用例来保证回调函数能够正常工作。
调用的该接口的方法 f.Get(key string)
,实际上就是在调用匿名回调函数。因为我们借助了GetterFunc
的类型转换,将一个匿名函数转成了接口f Getter
。
func TestGetter(t *testing.T) {
var f Getter = GetterFunc(func(key string) ([]byte, error) {
return []byte(key), nil
})
expect := []byte("key")
if v, _ := f.Get("key"); !reflect.DeepEqual(v, expect) {
t.Errorf("callback failed")
}
}
Group定义
Group是一个缓存的命名空间,每个Group拥有一个唯一的名称name,可以看做是一个领域,比如创建三个Group,缓存订单信息用order,缓存商品信息用product,缓存价格信息用price等。
也可以比如缓存学生的成绩命名为 scores,缓存学生信息的命名为 info,缓存学生课程的命名为 courses。
getter Getter
就是回调函数,如果缓存没有命中的情况下,从制定的数据源回调获取数据即可。
第三个mainCache cache
就是我们刚刚实现的并发缓存,里面有cache缓存双向链表。
构建函数 NewGroup
用来实例化 Group
,并且将 group 存储在全局变量 groups
中。
GetGroup
用来特定名称的 Group
,这里使用了只读锁 RLock()
,因为不涉及任何冲突变量的写操作。
所以后续流程是先通过GetGroup获取某个实例空间g,然后通过g的Get方法获取缓存。
Group的Get方法
Get
方法实现了上面的流程,也就是先判断缓存中有没有,如果有的话就去获取,如果没有就调用load方法,load方法在分布式的场景下会调用getFromPeer
从其他节点获取。
如果不是分布式场景,比如这里我们先设置了getLocally
调用用户回调函数 g.getter.Get()
获取源数据,并且将源数据添加到缓存 mainCache 中(通过 populateCache 方法)。
单测
首先用一个map模拟数据库。
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
首先实例化一个scores
的Group空间,并且得到实例化对象gee。然后设置回调函数,回调函数就是去我们设置的模拟数据库db map中查询值。
然后再通过for循环不断去查询缓存值。
上述的测试用例可以测试2种情况,分别是:
1)在缓存为空的情况下,能够通过回调函数获取到源数据。
2)在缓存已经存在的情况下,是否直接从缓存中获取,为了实现这一点,使用 loadCounts 统计某个键调用回调函数的次数,如果次数大于1,则表示调用了多次回调函数,没有缓存。
测试结果输出如下:
go test -run TestGet
2025/03/10 22:07:31 [SlowDB] search key Sam
2025/03/10 22:07:31 [GeeCache] hit
2025/03/10 22:07:31 [SlowDB] search key Tom
2025/03/10 22:07:31 [GeeCache] hit
2025/03/10 22:07:31 [SlowDB] search key Jack
2025/03/10 22:07:31 [GeeCache] hit
2025/03/10 22:07:31 [SlowDB] search key unknown
PASS
ok geecache 0.008s
3. HTTP服务端
这部分新建一个http.go
文件,新建之后的代码结构如下。
geecache/
|--lru/
|--lru.go // lru 缓存淘汰策略
|--byteview.go // 缓存值的抽象与封装
|--cache.go // 并发控制
|--geecache.go // 负责与外部交互,控制缓存存储和获取的主流程
|--http.go // 提供被其他节点访问的能力(基于http)
首先创建一个结构体HTTPPool
,作为承载HTTP通信的核心数据结构,包括服务端和客户端,本节中先只实现服务端。
HTTPPool
有两个参数,一个是self
,记录自己的地址(主机名ip+端口号,比如 e.g. "https://example.net:8000"
),另一个是basePath
,作为节点间通信地址的前缀,这里默认是const defaultBasePath = "/_geecache/"
,所以 http://example.com/_geecache/
开头的请求,就用于节点间的互相访问。因为一个主机上还可能承载其他的服务,加一段 Path 是一个好习惯。
serverHTTP方法
这个是核心的方法,处理HTTP请求。也就是实现了http.Handler
接口。
package http
type Handler interface {
ServeHTTP(w ResponseWriter, r *Request)
}
实现逻辑很简单,主要是先判断访问路径的前缀是否是 basePath,不是就返回错误。
if !strings.HasPrefix(r.URL.Path, p.basePath)
,这行代码检查请求路径 r.URL.Path
是否以 p.basePath
开头。如果不以 p.basePath 开头,条件为 true,代码会进入 if 块。
如果请求的 URL 是 http://localhost:9999/_geecache/scores/user123
,那么r.URL.Path
的值是 /_geecache/scores/user123
。
然后使用 w.Write()
将缓存值作为 httpResponse
的 body
返回。
单测
package main
import (
"fmt"
"geecache"
"log"
"net/http"
)
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
func main() {
geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
addr := "localhost:9999"
peers := geecache.NewHTTPPool(addr)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr, peers))
}
单测比较简单,实例化一个scores的空间,然后设定回调函数,启动http.ListenAndServe
在 9999 端口启动了 HTTP 服务。
然后使用curl发送请求即可。
$ curl http://localhost:9999/_geecache/scores/Tom
630
$ curl http://localhost:9999/_geecache/scores/kkk
kkk not exist
4. 一致性哈希
一致性哈希是走向分布式的一个重要不分,也是一个比较概念性的东西,我们先看看为什么引入一致性哈希算法。(如果了解过etcd,或者raft算法,这个就比较好理解了)。
对于分布式缓存来说,当一个节点接收到请求,如果该节点并没有存储缓存值,那么它面临的难题是,从谁那获取数据?自己,还是节点1, 2, 3, 4… 。假设包括自己在内一共有 10 个节点,当一个节点接收到请求时,随机选择一个节点,由该节点从数据源获取数据。
假设第一次随机选取了节点 1 ,节点 1 从数据源获取到数据的同时缓存该数据;那第二次,只有 1/10 的可能性再次选择节点 1, 有 9/10 的概率选择了其他节点,如果选择了其他节点,就意味着需要再一次从数据源获取数据,一般来说,这个操作是很耗时的。这样做,一是缓存效率低,二是各个节点上存储着相同的数据,浪费了大量的存储空间。(一般是某个节点只存一部分数据)
那有什么办法,对于给定的 key,每一次都选择同一个节点呢?使用 hash 算法也能够做到这一点。那把 key 的每一个字符的 ASCII 码加起来,再除以 10 取余数可以吗?当然可以,这可以认为是自定义的 hash 算法。
但是简单的hash并不能解决更复杂的问题,比如缓存节点的数量变化了。
假设移除其中一台节点,只剩下 9 个,那么之前 hash(key) % 10 变成了 hash(key) % 9,也就意味着几乎缓存值对应的节点都发生了改变。即几乎所有的缓存值都失效了。节点在接收到对应的请求时,均需要重新去数据源获取数据,容易引起 缓存雪崩。
简单hash不行,那就一致性hash。
一致性哈希原理
一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。
计算缓存节点/机器(通常使用缓存节点的名称、编号和 IP 地址)的哈希值,放置在环上。
环上有 peer2,peer4,peer6 三个节点,key11,key2,key27 均映射到 peer2,key23 映射到 peer4。此时,如果新增节点/机器 peer8,假设它新增位置如图所示,那么只有 key27 从 peer2 调整到 peer8,其余的映射均没有发生改变。
也就是说,一致性哈希算法,在新增/删除节点时,只需要重新定位该节点附近的一小部分数据,而不需要重新定位所有的节点,这就解决了上述的问题。
数据倾斜
如果服务器的节点过少,容易引起 key 的倾斜。例如上面例子中的 peer2,peer4,peer6 分布在环的上半部分,下半部分是空的。那么映射到环下半部分的 key 都会被分配给 peer2,key 过度向 peer2 倾斜,缓存节点间负载不均。
所以我们可以引入虚拟节点的概念,一个节点对应多个虚拟节点。
假设 1 个真实节点对应 3 个虚拟节点,那么 peer1 对应的虚拟节点是 peer1-1、 peer1-2、 peer1-3(通常以添加编号的方式实现),其余节点也以相同的方式操作。
虚拟节点扩充了节点的数量,解决了节点较少的情况下数据容易倾斜的问题。而且代价非常小,只需要增加一个字典(map)维护真实节点与虚拟节点的映射关系即可。
实现一致性哈希
Map
是一致性哈希算法的主数据结构,包含 4 个成员变量:Hash
函数 hash
;虚拟节点倍数 replicas
;哈希环 keys
;虚拟节点与真实节点的映射表 hashMap
,键是虚拟节点的哈希值,值是真实节点的名称。
定义了函数类型 Hash,采取依赖注入的方式,允许用于替换成自定义的 Hash 函数,也方便测试时替换,默认为 crc32.ChecksumIEEE
算法。
构造函数 New() 允许自定义虚拟节点倍数和 Hash 函数。
Add函数就是传入0或者多个真实节点名称。
对每一个真实节点 key
,对应创建 m.replicas
个虚拟节点,虚拟节点的名称是:strconv.Itoa(i) + key
,即通过添加编号的方式区分不同虚拟节点。
使用 m.hash()
计算虚拟节点的哈希值,使用 append(m.keys, hash)
添加到keys
环上。
在 hashMap
中增加虚拟节点和真实节点的映射关系。最后一步,环上的哈希值排序。
接下来还要实现选择某个虚拟节点的代码,如果上述流程都已经懂了,那么这个环节也比较简单。
通过传入的key值,计算哈希值,然后顺时针找到第一个匹配的虚拟节点的下标idx
,从 m.keys
中获取到对应的哈希值。然后通过通过 hashMap
映射得到真实的节点。
具体我们来讲讲实现代码的作用:
hash := int(m.hash([]byte(key)))
,使用 Map 结构体中的哈希函数计算输入 key 的哈希值。
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
使用 sort.Search
在已排序的 m.keys
数组中二分查找,找到第一个大于等于 hash
值的位置。(这是标准库中的二分查找函数,两个参数,一个参数是切片长度,第二个参数是返回布尔值的函数,用于判断中间元素是否满足条件。)
使用 idx%len(m.keys)
确保索引在有效范围内(处理环形结构),然后从 hashMap
中获取对应的节点值并返回。如果 idx 等于数组长度(没找到大于等于 hash 的值),则回到环的起点(索引0)。
单测
package consistenthash
import (
"strconv"
"testing"
)
func TestHashing(t *testing.T) {
hash := New(3, func(key []byte) uint32 {
i, _ := strconv.Atoi(string(key))
return uint32(i)
})
// Given the above hash function, this will give replicas with "hashes":
// 2, 4, 6, 12, 14, 16, 22, 24, 26
hash.Add("6", "4", "2")
testCases := map[string]string{
"2": "2",
"11": "2",
"23": "4",
"27": "2",
}
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
// Adds 8, 18, 28
hash.Add("8")
// 27 should now map to 8.
testCases["27"] = "8"
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
}
创建一个新的一致性哈希实例,每个真实节点复制3份(虚拟节点),然后定义一个自定义的哈希函数,简单将字节数组转换为整数。
hash := New(3, func(key []byte) uint32 {
i, _ := strconv.Atoi(string(key))
return uint32(i)
})
然后添加三个节点,会在哈希环上创建9个点(每个节点3个副本):2, 4, 6, 12, 14, 16, 22, 24, 26。
testCases := map[string]string{
"2": "2",
"11": "2",
"23": "4",
"27": "2",
}
测试不同键应该映射到哪个节点,例如,键"11"的哈希值是11,在环上顺时针找到的第一个节点是12(对应实际节点"2")。
来看看hash函数,它接收一个字节数组( []byte )作为输入,将这个字节数组转换为字符串,然后使用 strconv.Atoi 将字符串转换为整数,最后将整数转换为 uint32 类型并返回。
hash := New(3, func(key []byte) uint32 {
i, _ := strconv.Atoi(string(key))
return uint32(i)
})
//添加节点
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
所以可以得到:
// 第一次循环 i=0
"02" -> hash("02") = 2
m.keys = [2]
m.hashMap[2] = "2"
// 第二次循环 i=1
"12" -> hash("12") = 12
m.keys = [2, 12]
m.hashMap[12] = "2"
// 第三次循环 i=2
"22" -> hash("22") = 22
m.keys = [2, 12, 22]
m.hashMap[22] = "2"
5. 分布式节点
peers
这一节是实现从远程节点获取缓存值,首先抽象出两个接口。
PeerPicker
的 PickPeer()
方法用于根据传入的 key
选择相应节点 PeerGetter
。
接口 PeerGetter
的 Get()
方法用于发起一个http请求从对应的节点中查找缓存值(类似于curl)。
PeerGetter
就对应于上述流程中的 HTTP 客户端。
也就是节点既可以是客户端也可以是服务端,可以这么理解。
HTTP客户端功能
之前我们已经在http.go
中实现了server服务端,接下来实现HTTPPool
中客户端的功能。
首先创建具体的 HTTP 客户端类 httpGetter
,实现 PeerGetter
接口。
baseURL
表示将要访问的远程节点的地址,例如 http://example.com/_geecache/
。
使用 http.Get()
方式获取返回值,并转换为 []bytes
类型。
选择节点功能
新增成员变量 peers
,类型是一致性哈希算法的 Map
,用来根据具体的 key
选择节点。
新增成员变量 httpGetters
,映射远程节点与对应的 httpGetter。每一个远程节点对应一个 httpGetter
,因为 httpGetter
与远程节点的地址 baseURL
有关。
Set()
方法实例化一致性哈希算法,并且添加了传入的节点。并为每一个节点创建了一个 HTTP 客户端 httpGetter
。
PickerPeer()
包装了一致性哈希算法的 Get()
方法,根据具体的 key,选择节点,返回节点对应的 HTTP 客户端(也就是httpGetter,一个地址),根据这个地址会发起一个Get请求,这个Get请求是以客户端的身份去发起的一个http请求。
主流程实现
我们只需要把上述的功能集成在geecache.go
中即可。
// A Group is a cache namespace and associated data loaded spread over
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker
}
// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
func (g *Group) load(key string) (value ByteView, err error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
}
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: bytes}, nil
}
新增 RegisterPeers()
方法,将 实现了 PeerPicker
接口的 HTTPPool
注入到 Group
中。
新增 getFromPeer()
方法,使用实现了 PeerGetter
接口的 httpGetter
访问远程节点,获取缓存值。
修改 load
方法,使用 PickPeer()
方法选择节点,若非本机节点,则调用 getFromPeer()
从远程获取。若是本机节点或失败,则回退到 getLocally()
。
这里的流程比较复杂,可能会容易绕晕。这里我们再梳理一下流程。
如果本地缓存命中:请求 → ServeHTTP → Group.Get → mainCache.get → 返回数据。
如果本地缓存未命中,但数据在其他节点 :请求 → ServeHTTP → Group.Get → load → PickPeer → getFromPeer → 返回数据。
如果所有节点都未命中 :请求 → ServeHTTP → Group.Get → load → getLocally → 从数据源获取 → 更新缓存 → 返回数据。
这里我做了一个流程图,可以看看下面的节点流程。
一个HTTPPool
中的节点,同时有服务端和客户端的功能。服务端就是响应请求,并从请求中获取url
参数,得到groupName
和key
,然后进行group.Get
的方式查询。
如果本地mainCache
中没有,这个节点就会转变为客户端的身份,会调用PickPeer
函数通过key
找到对应的远程节点peer
,然后通过p.httpGettters
得到这个peer
的httpGetter
(实现了Getter接口)的函数,本质上是这个peer
的baseURL
+basePath
,得到了这个函数的Getter
之后,把这个Getter
(也就是远程通信节点的地址)加上key
作为参数,调用getFromPeer
函数,然后进一步调用peer.Get()
,这个peer.Get()
就是我们实现的PeerGetter
接口函数(也就是客户端身份的函数,去发起一个http请求),这个函数会直接拼凑一个地址,通过peer.baseURL+group+key
发起一个http.Get()
请求,
单测
先看看单测的示意图,方便理解。
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr)
peers.Set(addrs...)
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr[7:], peers))
}
func startAPIServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}
func main() {
var port int
var api bool
flag.IntVar(&port, "port", 8001, "Geecache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()
apiAddr := "http://localhost:9999"
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}
gee := createGroup()
if api {
go startAPIServer(apiAddr, gee)
}
startCacheServer(addrMap[port],addrs, gee)
}
startCacheServer()
用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。
startAPIServer()
用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。
main() 函数需要命令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。
测试脚本:
#!/bin/bash
trap "rm server;kill 0" EXIT
go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &
sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
wait
总结一下脚本的流程:
1、首先每个节点启动的时候都会执行gee := createGroup()
,也就是创建一个名为"scores"的缓存组实例。然后这个实例会被传递给 startCacheServer
函数(作为第三个参数)。所以每个节点都有自己独立的缓存空间,但它们共同组成了一个分布式缓存系统。当一个节点需要获取某个键的值时,会先检查本地缓存,如果没有找到,则根据一致性哈希算法确定应该由哪个节点负责这个键,然后向该节点发起请求。
2、startCacheServer传入的参数到底是什么?
startCacheServer(
"http://localhost:8001", // 当前节点地址
["http://localhost:8001", "http://localhost:8002", "http://localhost:8003"], // 所有节点地址
gee // 缓存组实例
)
每个节点都知道,自己的地址(第一个参数)、集群中所有节点的地址(第二个参数)、要使用的缓存组(第三个参数)。
3、startCacheServer
中设置peers
的流程。
首先创建一个HTTP节点池:
peers := geecache.NewHTTPPool(addr)
然后调用 peers.Set() 方法设置所有节点地址:
peers.Set(addrs...)
在 Set 方法内部:
- 创建一致性哈希实例
- 添加所有节点到一致性哈希环
- 为每个远程节点创建HTTP客户端
最后将配置好的节点池注册到缓存组:
gee.RegisterPeers(peers)
举个实例流程说明下,假设调用 startCacheServer("http://localhost:8001", addrs, gee)
,会创建HTTP节电池。
peers := geecache.NewHTTPPool("http://localhost:8001")
// 此时peers.self = "http://localhost:8001"
// peers.basePath = "/_geecache/"
设置所有节点:
peers.Set(
"http://localhost:8001",
"http://localhost:8002",
"http://localhost:8003"
)
在Set方法内部有:
// 创建一致性哈希实例,虚拟节点倍数为defaultReplicas(50)
p.peers = consistenthash.New(50, nil)
// 添加所有节点到一致性哈希环
p.peers.Add("http://localhost:8001", "http://localhost:8002", "http://localhost:8003")
// 创建HTTP客户端映射
p.httpGetters = make(map[string]*httpGetter, 3)
// 为每个节点创建HTTP客户端
p.httpGetters["http://localhost:8001"] = &httpGetter{
baseURL: "http://localhost:8001/_geecache/"
}
p.httpGetters["http://localhost:8002"] = &httpGetter{
baseURL: "http://localhost:8002/_geecache/"
}
p.httpGetters["http://localhost:8003"] = &httpGetter{
baseURL: "http://localhost:8003/_geecache/"
}
注册到缓存组:
gee.RegisterPeers(peers)
// 此时g.peers = peers
首先检查本地的mainCache
是否有缓存,如果没有,使用一致性哈希算法确定应该由哪个节点处理,使用对应的HTTP客户端发送请求。
例如,当8003节点收到获取键"Tom"的请求时,一致性哈希算法可能会选择8001节点,于是8003会使用 httpGetters["http://localhost:8001"]
向8001调用getFromPeer(peer PeerGetter, key string) (ByteView, error)
函数请求,然后进一步调用 peer.Get(g.name, key)
,在Get中,会直接使用http.Get(u)
。
输出结果:
$ ./run.sh
2020/02/16 21:17:43 geecache is running at http://localhost:8001
2020/02/16 21:17:43 geecache is running at http://localhost:8002
2020/02/16 21:17:43 geecache is running at http://localhost:8003
2020/02/16 21:17:43 fontend server is running at http://localhost:9999
>>> start test
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
...
630630630
总结下流程:
- 客户端向API服务器(9999端口,运行在8003节点上)发送3个请求
- API服务器处理这些请求,使用一致性哈希算法确定键"Tom"应该由哪个节点处理
- 一致性哈希算法选择了8001节点
- 8003节点(API服务器所在节点)向8001节点发送了3次请求获取数据
- 最终返回了3个"630"(Tom的分数)
测试的时候,并发 3 个请求 ?key=Tom
,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题是,同时向 8001 发起了 3 次请求。试想,假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。
6.防止缓存击穿
上一节并发了 N 个请求 ?key=Tom
,8003 节点向 8001 同时发起了 N 次请求。假设对数据库的访问没有做任何限制的,很可能向数据库也发起 N 次请求,容易导致缓存击穿和穿透。即使对数据库做了防护,HTTP 请求是非常耗费资源的操作,针对相同的 key,8003 节点向 8001 发起三次请求也是没有必要的。那这种情况下,如何做到只向远端节点发起一次请求呢?可以使用singleflight
进行限流。
call
代表正在进行中,或已经结束的请求。使用 sync.WaitGroup
锁避免重入。
Group
是 singleflight 的主数据结构,管理不同 key 的请求(call)。
Do 方法,接收 2 个参数,第一个参数是 key,第二个参数是一个函数 fn。Do 的作用就是,针对相同的 key,无论 Do 被调用多少次,函数 fn 都只会被调用一次,等待 fn 调用结束了,返回返回值或错误。
因为此时我们的项目并发,但是协程之间不需要消息传递,非常适合 sync.WaitGroup
。
来看看只有逻辑处理的代码。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
if c, ok := g.m[key]; ok {
c.wg.Wait() // 如果请求正在进行中,则等待
return c.val, c.err // 请求结束,返回结果
}
c := new(call)
c.wg.Add(1) // 发起请求前加锁
g.m[key] = c // 添加到 g.m,表明 key 已经有对应的请求在处理
c.val, c.err = fn() // 调用 fn,发起请求
c.wg.Done() // 请求结束
delete(g.m, key) // 更新 g.m
return c.val, c.err // 返回结果
}
wg.Add(1) 锁加1。
wg.Wait() 阻塞,直到锁被释放。
wg.Done() 锁减1。
假设有多个并发请求同时获取键 “Tom” 的数据:
- 第一个请求进入 Do 函数,发现 g.m[“Tom”] 不存在,创建新的 call ,开始执行 fn() (例如从数据库加载)
- 在第一个请求执行 fn() 的过程中,第二个、第三个请求也进入 Do 函数
- 这些后续请求发现 g.m[“Tom”] 已存在,于是调用 Wait() 等待
- 第一个请求完成 fn() 调用,获得结果,调用 Done()
- 所有等待的请求被唤醒,直接返回第一个请求获取到的结果
- 这样,对于 “Tom” 这个键,无论有多少并发请求,实际上只会执行一次数据库查询
主流程实现
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker
// use singleflight.Group to make sure that
// each key is only fetched once
loader *singleflight.Group
}
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
// ...
g := &Group{
// ...
loader: &singleflight.Group{},
}
return g
}
func (g *Group) load(key string) (value ByteView, err error) {
// each key is only fetched once (either locally or remotely)
// regardless of the number of concurrent callers.
viewi, err := g.loader.Do(key, func() (interface{}, error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
})
if err == nil {
return viewi.(ByteView), nil
}
return
}
7. Protobuf通信
这部分写过很多了,这里不再赘述,可以看文章:
【Go | 从0实现简单分布式缓存】-5:使用Protobuf通信
8. 还能怎么优化?
1. 缓存淘汰策略优化
目前 GeeCache 使用 LRU 作为缓存淘汰策略,可以考虑实现更多的淘汰策略:
- LFU (Least Frequently Used) : 淘汰访问频率最低的数据
- FIFO (First In First Out) : 淘汰最早添加的数据
- W-TinyLFU : 结合窗口 TinyLFU 的混合策略,提高命中率。
2. 过期时间机制
添加 TTL (Time To Live) 机制,允许缓存项自动过期:
// ByteView 结构体添加过期时间
type ByteView struct {
b []byte
expireAt time.Time // 过期时间
}
// 检查是否过期
func (v ByteView) isExpired() bool {
return !v.expireAt.IsZero() && time.Now().After(v.expireAt)
}
3. 并发性能优化
使用分段锁 (Shard) 减少锁竞争:
// 分段缓存
type shardedCache struct {
shards []*cache
shardMask int
}
// 根据 key 选择分片
func (sc *shardedCache) getShard(key string) *cache {
hash := fnv.New32()
hash.Write([]byte(key))
return sc.shards[hash.Sum32()&uint32(sc.shardMask)]
}
4. 数据压缩
对大型数据进行压缩,减少网络传输和内存占用:
// 压缩数据
func compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
5. 预热机制
实现缓存预热功能,在启动时加载热点数据:
// 缓存预热
func (g *Group) Warmup(keys []string) error {
for _, key := range keys {
go func(k string) {
_, _ = g.Get(k)
}(key)
}
return nil
}
6. 分布式一致性优化
使用更高效的一致性哈希算法或引入服务发现机制。
// 服务发现接口
type ServiceDiscovery interface {
Register(addr string) error
Deregister() error
GetServices() ([]string, error)
Watch(callback func([]string))
}