【Go | 从0实现简单分布式缓存】-2:HTTP服务端与一致性哈希
本文目录
- 一、回顾
- 1.1 复习接口
- 二、http标准库
- 三、实现HTTP服务端
- 四、一致性哈希
本文为极客兔兔“动手写分布式缓存GeeCache”学习笔记。
一、回顾
昨天已经开发了一部分项目,我们先来看看项目结构。
分布式缓存需要实现节点间通信,建立基于 HTTP 的通信机制是比较常见和简单的做法。如果一个节点启动了 HTTP 服务,那么这个节点就可以被其他节点访问。
所以现在需要为单机节点搭建 HTTP Server。
1.1 复习接口
接口是一个类型,它定义了一组方法签名(方法名和参数列表)。一个类型只要实现了接口中定义的所有方法,就自动实现了该接口。
type 接口名称 interface {
方法1(参数列表) 返回值
方法2(参数列表) 返回值
...
}
例如,http.Handler
是Go标准库中定义的一个接口:
type Handler interface {
ServeHTTP(w http.ResponseWriter, r *http.Request)
}
这个接口定义了一个方法 ServeHTTP,任何类型只要实现了这个方法,就自动实现了 http.Handler 接口。
在Go中,实现接口不需要显式声明。只要一个类型提供了接口中定义的所有方法,它就隐式地实现了该接口。
假设我们定义一个接口 Greeter,它有一个方法 Greet:
type Greeter interface {
Greet() string
}
接下来定义一个类型 Person,并为它实现 Greet 方法:
type Person struct {
Name string
}
func (p Person) Greet() string {
return "Hello, my name is " + p.Name
}
Person 类型定义了一个方法 Greet,其签名与 Greeter 接口中的方法一致。所以Person 类型自动实现了 Greeter 接口。
我们可以将 Person 类型的变量赋值给 Greeter 类型的变量,并调用接口方法:
func main() {
p := Person{Name: "Alice"}
var g Greeter = p
fmt.Println(g.Greet()) // 输出: Hello, my name is Alice
}
类型断言用于检查一个接口变量是否实现了某个具体类型,并获取该具体类型的值:
var i interface{} = "Hello"
if str, ok := i.(string); ok {
fmt.Println("It's a string:", str)
} else {
fmt.Println("It's not a string")
}
二、http标准库
复习完上面的接口,我们来看看怎么实现go提供的http标准库。
w http.ResponseWriter
:用于向客户端发送HTTP响应。r *http.Request
:包含客户端请求的所有信息。
type Handler interface {
ServeHTTP(w http.ResponseWriter, r *http.Request)
}
代码中type server int
,是一个基于 int 的新类型,但类型本身并不重要,重要的是它通过方法实现了接口。接下来,为 server 类型定义一个方法。
server 类型定义了一个方法 ServeHTTP,其签名与 http.Handler 接口中的方法完全一致。因此,server 类型自动实现了 http.Handler 接口。
func (h *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println(r.URL.Path)
w.Write([]byte("Hello World!"))
}
http.ListenAndServe()
函数需要两个参数,分别是addr地址,还有handler处理器。
由于 server 实现了 ServeHTTP 方法,&s 满足 http.Handler 接口的要求。
通过将 server 类型的指针传递给 http.ListenAndServe,我们可以将其作为HTTP处理器使用。
所以完整的一个简单HTTP服务端代码如下。
package Geecache
import (
"log"
"net/http"
)
type server int
// ServeHTTP 是 http.Handler 接口的唯一方法。通过实现这个方法,server 类型的实例可以作为HTTP处理器。
func (h *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println(r.URL.Path) // 将请求的路径记录到日志中。
w.Write([]byte("Hello World!")) //向客户端发送响应。
}
func main() {
var s server
//启动一个HTTP服务器,监听 localhost 上的 9999 端口。
//第二个参数 &s 是一个 server 类型的指针,表示将 s 作为HTTP处理器。
//由于 server 类型实现了 ServeHTTP 方法,因此它满足 http.Handler 接口。
http.ListenAndServe("localhost:9999", &s)
}
在Go中,接口变量存储了两个信息:
动态类型:变量的实际类型(在这里是 *server)。
动态值:变量的实际值(在这里是 &s)。
当调用接口方法时,Go运行时会根据动态类型找到对应的方法实现,并执行它。这种机制使得接口非常灵活,且运行时可以动态绑定方法。
三、实现HTTP服务端
项目结构如上图所示,接下来我们在main.go
中写测试代码。
跟昨天一样,使用 map 模拟了数据源 db。创建一个名为 scores 的 Group,若缓存为空,回调函数会从 db 中获取数据并返回。使用 http.ListenAndServe 在 9999 端口启动了 HTTP 服务。
需要注意的是main.go 和 geecache/ 在同级目录,但 go modules 不再支持 import <相对路径>,相对路径需要在 go.mod 中声明:
require geecache v0.0.0
replace geecache => ./geecache
package main
/*
$ curl http://localhost:9999/_geecache/scores/Tom
630
$ curl http://localhost:9999/_geecache/scores/kkk
kkk not exist
*/
import (
"fmt"
"geecache"
"log"
"net/http"
)
// 模拟数据库,db
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
// 回调函数是一个通过参数传递给另一个函数的函数,并在某个事件发生时被调用。在这段代码中:
// 匿名函数被传递给geecache.NewGroup作为GetterFunc的参数。
// 当缓存中没有找到对应的key时,geecache会调用这个函数来从底层存储中获取数据。
// geecache.GetterFunc:将一个匿名函数转换为Getter接口的实现。
func main() {
geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
// 回调函数,当调用g.getter.Get(key) 就会从下面的代码运行从db中去取数据了
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
addr := "localhost:9999"
peers := geecache.NewHTTPPool(addr)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr, peers))
}
然后做一些简单的测试,可以看到对应的输出如下。
四、一致性哈希
对于分布式缓存来说,当一个节点接收到请求,如果该节点并没有存储缓存值,那么它面临的难题是,从谁那获取数据?自己,还是节点1, 2, 3, 4… 。假设包括自己在内一共有 10 个节点,当一个节点接收到请求时,随机选择一个节点,由该节点从数据源获取数据。
假设第一次随机选取了节点 1 ,节点 1 从数据源获取到数据的同时缓存该数据;那第二次,只有 1/10 的可能性再次选择节点 1, 有 9/10 的概率选择了其他节点,如果选择了其他节点,就意味着需要再一次从数据源获取数据,一般来说,这个操作是很耗时的。这样做,一是缓存效率低,二是各个节点上存储着相同的数据,浪费了大量的存储空间。
那有什么办法,对于给定的 key,每一次都选择同一个节点呢?使用 hash 算法也能够做到这一点。那把 key 的每一个字符的 ASCII 码加起来,再除以 10 取余数可以吗?当然可以,这可以认为是自定义的 hash 算法。
但是这会导致另一个问题,就是缓存雪崩
,简单求取 Hash 值解决了缓存性能的问题,但是没有考虑节点数量变化的场景。假设移除了其中一台节点,只剩下 9 个,那么之前 hash(key) % 10 变成了 hash(key) % 9,也就意味着几乎缓存值对应的节点都发生了改变。即几乎所有的缓存值都失效了。
节点在接收到对应的请求时,均需要重新去数据源获取数据,容易引起 缓存雪崩。
缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。常因为缓存服务器宕机,或缓存设置了相同的过期时间引起。
而一致性哈希算法可以解决这个问题。
一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。
计算节点/机器(通常使用节点的名称、编号和 IP 地址)的哈希值,放置在环上。
计算 key 的哈希值,放置在环上,顺时针寻找到的第一个节点,就是应选取的节点/机器。
环上有 peer2,peer4,peer6 三个节点,key11,key2,key27 均映射到 peer2,key23 映射到 peer4。此时,如果新增节点/机器 peer8,假设它新增位置如图所示,那么只有 key27 从 peer2 调整到 peer8,其余的映射均没有发生改变。
也就是说,一致性哈希算法,在新增/删除节点时,只需要重新定位该节点附近的一小部分数据,而不需要重新定位所有的节点,这就解决了上述的问题。
但是相对的,也会有新问题,那就是如果服务器的节点过少,容易引起 key 的倾斜。例如上面例子中的 peer2,peer4,peer6 分布在环的上半部分,下半部分是空的。那么映射到环下半部分的 key 都会被分配给 peer2,key 过度向 peer2 倾斜,缓存节点间负载不均。
为了解决这个问题,引入了虚拟节点的概念,一个真实节点对应多个虚拟节点。
假设 1 个真实节点对应 3 个虚拟节点,那么 peer1 对应的虚拟节点是 peer1-1、 peer1-2、 peer1-3(通常以添加编号的方式实现),其余节点也以相同的方式操作。
虚拟节点扩充了节点的数量,解决了节点较少的情况下数据容易倾斜的问题。而且代价非常小,只需要增加一个字典(map)维护真实节点与虚拟节点的映射关系即可。
然后我们可以试着来实现一致性哈希
算法,新建consistenthash
文件。
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
)
// Hash maps bytes to uint32
type Hash func(data []byte) uint32
// Map constains all hashed keys
type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}
// New creates a Map instance
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
// Add adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// Get gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash
})
return m.hashMap[m.keys[idx%len(m.keys)]]
}
定义函数类型 Hash,采取依赖注入的方式,允许用于替换成自定义的 Hash 函数,也方便测试时替换,默认为 crc32.ChecksumIEEE 算法。
Map 是一致性哈希算法的主数据结构,包含 4 个成员变量:Hash 函数 hash;虚拟节点倍数 replicas;哈希环 keys;虚拟节点与真实节点的映射表 hashMap,键是虚拟节点的哈希值,值是真实节点的名称。
构造函数 New() 允许自定义虚拟节点倍数和 Hash 函数。
接下来可以进行对应测试。
package consistenthash
import (
"strconv"
"testing"
)
func TestHashing(t *testing.T) {
hash := New(3, func(key []byte) uint32 {
i, _ := strconv.Atoi(string(key))
return uint32(i)
})
// Given the above hash function, this will give replicas with "hashes":
// 2, 4, 6, 12, 14, 16, 22, 24, 26
hash.Add("6", "4", "2")
testCases := map[string]string{
"2": "2",
"11": "2",
"23": "4",
"27": "2",
}
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
// Adds 8, 18, 28
hash.Add("8")
// 27 should now map to 8.
testCases["27"] = "8"
for k, v := range testCases {
if hash.Get(k) != v {
t.Errorf("Asking for %s, should have yielded %s", k, v)
}
}
}
一开始,有 2/4/6 三个真实节点,对应的虚拟节点的哈希值是 02/12/22、04/14/24、06/16/26。
那么用例 2/11/23/27 选择的虚拟节点分别是 02/12/24/02,也就是真实节点 2/2/4/2。
添加一个真实节点 8,对应虚拟节点的哈希值是 08/18/28,此时,用例 27 对应的虚拟节点从 02 变更为 28,即真实节点 8。