当前位置: 首页 > article >正文

golang分布式缓存项目 Day5 分布式节点

该项目原作者:https://geektutu.com/post/geecache-day1.html。本文旨在记录本人做该项目时的一些疑惑解答以及部分的测试样例以便于本人复习

1 流程回顾注:

在这里插入图片描述
我们在GeeCache 第二天 中描述了 geecache 的流程。在这之前已经实现了流程 ⑴ 和 ⑶,今天实现流程 ⑵,从远程节点获取缓存值。

我们进一步细化流程 ⑵:
在这里插入图片描述

2 抽象 PeerPicker

package geecache

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
	PickPeer(key string) (peer PeerGetter, ok bool)
}

// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface {
	Get(group string, key string) ([]byte, error)
}
  • 在这里,抽象出 2 个接口,PeerPicker 的 PickPeer() 方法用于根据传入的 key 选择相应节点
    PeerGetter。

  • 接口 PeerGetter 的 Get() 方法用于从对应 group 查找缓存值。PeerGetter 就对应于上述流程中的 HTTP客户端。

3 节点选择与 HTTP 客户端

在 GeeCache 第三天 中我们为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来要为 HTTPPool 实现客户端的功能。

首先创建具体的 HTTP 客户端类 httpGetter,实现 PeerGetter 接口。

type httpGetter struct {
	baseURL string
}

func (h *httpGetter) Get(group string, key string) ([]byte, error) {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key),
	)
	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}

	return bytes, nil
}

var _ PeerGetter = (*httpGetter)(nil)
  • baseURL 表示将要访问的远程节点的地址,例如 http://example.com/_geecache/。
  • 使用 http.Get() 方式获取返回值,并转换为 []bytes 类型。

:var _ PeerGetter = (*httpGetter)(nil) 是为了实现PeerGetter 接口防止报错

第二步,为 HTTPPool 添加节点选择的功能。

const (
	defaultBasePath = "/_geecache/"
	defaultReplicas = 50
)
// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
	// this peer's base URL, e.g. "https://example.net:8000"
	self        string
	basePath    string
	mu          sync.Mutex // guards peers and httpGetters
	peers       *consistenthash.Map
	httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}
  • 新增成员变量 peers,类型是一致性哈希算法的 Map,用来根据具体的 key 选择节点。
  • 新增成员变量 httpGetters,映射远程节点与对应的 httpGetter。每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址 baseURL 有关。

第三步,实现 PeerPicker 接口。

// Set updates the pool's list of peers.
func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(defaultReplicas, nil)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

// PickPeer picks a peer according to key
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()
	if peer := p.peers.Get(key); peer != "" && peer != p.self {
		p.Log("Pick peer %s", peer)
		return p.httpGetters[peer], true
	}
	return nil, false
}

var _ PeerPicker = (*HTTPPool)(nil)
  • Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。
  • 并为每一个节点创建了一个 HTTP 客户端 httpGetter。
  • PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点,返回节点对应的 HTTP 客户端。

至此,HTTPPool 既具备了提供 HTTP 服务的能力,也具备了根据具体的 key,创建 HTTP 客户端从远程节点获取缓存值的能力。

4 实现主流程

最后,我们需要将上述新增的功能集成在主流程(geecache.go)中。

// A Group is a cache namespace and associated data loaded spread over
type Group struct {
	name      string
	getter    Getter
	mainCache cache
	peers     PeerPicker
}

// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}
	g.peers = peers
}

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}

func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}
  • 新增 RegisterPeers() 方法,将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中。
  • 新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值。
  • 修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()。

5 main 函数测试。

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))
}

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr[7:], peers))
}

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
			key := r.URL.Query().Get("key")
			view, err := gee.Get(key)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/octet-stream")
			w.Write(view.ByteSlice())

		}))
	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(apiAddr[7:], nil))

}

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], []string(addrs), gee)
}

main 函数的代码比较多,但是逻辑是非常简单的。

  • startCacheServer() 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP服务(共3个端口,8001/8002/8003),用户不感知。
  • startAPIServer() 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。
  • main() 函数需要命令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。

为了方便,我们将启动的命令封装为一个 shell 脚本:

#!/bin/bash
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &

wait

运行结果

$ ./run.sh
2020/02/16 21:17:43 geecache is running at http://localhost:8001
2020/02/16 21:17:43 geecache is running at http://localhost:8002
2020/02/16 21:17:43 geecache is running at http://localhost:8003
2020/02/16 21:17:43 fontend server is running at http://localhost:9999
>>> start test
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
2020/02/16 21:17:45 [Server http://localhost:8003] Pick peer http://localhost:8001
...
630630630

此时,我们可以打开一个新的 shell,进行测试:

$ curl "http://localhost:9999/api?key=Tom"
630
$ curl "http://localhost:9999/api?key=kkk"
kkk not exist

测试的时候,我们并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001 发起了 3 次请求。试想,假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。

三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这个问题下一次解决。

阶段性总结

概括性流程:

开始
startCacheServer
将多个服务器节点绑定到本地服务器gee上
http.ListenAndServe
peers 监听8001, 8002, 8003端口
并将端口监听到的请求交由peers绑定的ServeHTTP处理
go startAPIServer
创建与用户面对的api客户端
http.ListenAndServe 监听9999端口
http.Handle处理路径为/api的请求
发出http://localhost:9999/api?key=Tom的HTTP请求
startAPIServer函数中http.Handle方法处理
查找本地缓存
不存在则从远程节点上查找
找到与本地节点绑定的多个节点
并通过consistenthash算法找出这个请求应该被处理的节点
来到这个节点
用http.Get方法再发送一个路径为peer + p.basePath/_geecache/scores/Tom的请求
startCacheServer函数中的peers上实现的ServeHTTP方法处理
查找这个节点的缓存是否有所要找的值
没找到
再次用consistenthash算法计算所要找的节点
因为key没有变,所以找到了同一个节点,也就是这个节点本身
回到节点本地
调用getLocally方法
进行从数据库中查找的慢过程
并将从数据库中找出的键值对存储到这个节点的缓存中
END

PS:用mermaid画流程图的时候,用逗号要切换到英文输入法再输入,否则会导致输出不了图片!!!


http://www.kler.cn/a/392104.html

相关文章:

  • nginx配置负载均衡详解
  • 基于混合配准策略的多模态医学图像配准方法研究
  • 使用 start-local 脚本在本地运行 Elasticsearch
  • Java设计模式面试题及参考答案
  • C++20 中最优雅的那个小特性 - Ranges
  • 【学习笔记】数据结构(七)
  • ssm093基于Java Web的毕业生就业状况管理系统设计与实现+jsp(论文+源码)_kaic
  • 谷歌浏览器支持的开发者工具详解
  • Linux符号使用记录
  • ubuntu20.04_从零LOD-3DGS的复现
  • 爬虫反爬机制和解决方案
  • 解决 ElSelect 数据量大导致加载速度慢
  • OpenGL【C++】台灯
  • 【AI换脸整合包及教程】深入了解Rope:一款强大的AI换脸工具及其技术原理
  • JavaScript 观察者设计模式
  • Scala的List(可变)
  • 微搭低代码入门02条件语句
  • 【SpringBoot】黑马大事件笔记-day3
  • 用 Python 从零开始创建神经网络(二):第一个神经元的进阶
  • 停车共享小程序ssm+论文源码调试讲解
  • 实现linux定时备份数据至群晖NAS
  • python爬取newbing每日壁纸
  • JDBC事务管理、四大特征(ACID)、事务提交与回滚、MySQL事务管理
  • C语言串讲-2之指针和结构体
  • 2024 ECCV | DualDn: 通过可微ISP进行双域去噪
  • ubuntu20.04 解决Pycharm没有写入权限,无法通过检查更新更新的问题