当前位置: 首页 > article >正文

LRU go cache的实现

目录

  • LRU算法
    • LRU原理
    • LRU实现
    • Redis LRU算法实现
      • 1. 内存淘汰策略
      • 2. LRU算法的实现
      • 3. LRU vs LFU
      • Redis中的LRU使用场景
  • 基于LRU的缓存库
    • go-cache
      • 安装使用
      • 代码解析
    • hashicorp/golang-lru
      • 安装使用
      • 代码解析
    • groupcache
      • 安装使用
      • 代码解析
        • 缓存淘汰算法
        • 并发缓存组
          • 一致性哈希
          • 防止缓存击穿——singleflight 的使用
        • 接口型函数
        • "Set"/Get方法
  • 内存计算的思考
    • reflect+unsafe计算内存消耗
    • runtime.MemStats计算内存

LRU算法

LRU(Least Recently Used,最近最少使用)是一种缓存淘汰策略,用于决定当缓存满时,应该淘汰哪个缓存项。其基本思想是优先淘汰那些最近最少被访问的数据,假设最近使用的数据在未来一段时间内仍然可能被再次使用。

LRU原理

LRU思想遵循以下逻辑:

  1. 优先级依据: 每当缓存中的一个项被访问时,它的优先级会被提升,这意味着它被认为更有可能在短期内再次被使用。
  2. 淘汰策略: 当缓存容量达到上限时,LRU策略会淘汰最近最少使用的项,即最久未被访问的那一项,从而为新项腾出空间。

假设缓存的容量为3,并且按照如下顺序访问数据:

A -> B -> C -> A -> D

在每一步中,LRU缓存的状态如下:

  1. 缓存空:放入 A,缓存为 [A]
  2. 放入 B,缓存为 [A, B]
  3. 放入 C,缓存为 [A, B, C]
  4. 访问 AA被提升优先级,缓存为 [B, C, A]
  5. 放入 D,缓存已满,因此淘汰最近最少使用的 B,缓存变为 [C, A, D]

可以看到,每次新项进入缓存时,最久未被使用的项会被淘汰。

LRU实现

在这里插入图片描述

LRU常通过一个双向链表和哈希表来实现:

  • 哈希表: 用于快速定位缓存中的数据。
  • 双向链表: 用于维护数据的使用顺序,链表头部是最新使用的数据,尾部是最久未使用的数据。

当数据被访问时,将其移动到链表头部;当缓存满时,删除链表尾部的数据。

LRU的思想非常适用于缓存管理,因为它假设“最近被使用的数据未来可能还会被再次使用”,这是一个较为普遍的规律。

Redis LRU算法实现

在Redis中,LRU(Least Recently Used)缓存淘汰策略是一个重要的概念,特别是当Redis运行在内存有限的环境下时。当Redis达到内存限制时,它需要根据某种策略来删除一些旧的数据,从而为新的数据腾出空间。在这种情况下,Redis提供了多种缓存淘汰策略,其中就包括LRU策略。

Redis中的LRU策略主要体现在以下几个方面:

1. 内存淘汰策略

Redis允许你配置不同的内存淘汰策略,包括但不限于LRU策略。可以通过配置参数 maxmemory-policy 来选择具体的策略。常见的LRU相关策略有:

  • volatile-lru:从设置了过期时间的键中,淘汰最近最少使用的键。
  • allkeys-lru:从所有键中淘汰最近最少使用的键,不论键是否设置了过期时间。

Redis中其他与LRU相关的策略还包括:

  • volatile-ttl:淘汰即将过期的键(根据TTL)。
  • volatile-random:从设置了过期时间的键中随机选择一个淘汰。
  • allkeys-random:从所有键中随机选择一个淘汰。
  • noeviction:不淘汰键,返回错误来表示内存不足(通常用于确保内存不会被溢出)。

2. LRU算法的实现

Redis中的LRU算法并不是严格的LRU,而是近似LRU。原因在于Redis为了性能考虑,并没有对每个键都维护精确的访问时间,而是使用了采样算法。即每当需要淘汰键时,Redis并不会遍历所有键,而是随机选择若干个键,并从中选择最近最少使用的键进行淘汰。这个采样的数量可以通过配置 maxmemory-samples 参数来控制,通常默认为5。

  • 采样大小影响精度:采样的数量越多,淘汰策略越接近真正的LRU;采样数少时,策略更接近随机淘汰。

3. LRU vs LFU

在Redis 4.0引入了LFU(Least Frequently Used)策略,这是一种替代LRU的新策略。与LRU不同,LFU是基于访问频率来淘汰数据的策略,可以通过配置 allkeys-lfuvolatile-lfu 来启用。

Redis中的LRU使用场景

  1. 缓存系统:当Redis作为缓存层时,LRU是一个非常常用的策略,因为LRU假设最近使用过的数据未来仍然会被访问,所以可以最大化缓存的命中率。

  2. 内存受限的场景:当Redis运行在内存受限的环境下(例如IoT设备、边缘计算),配置合理的LRU策略可以确保系统稳定运行。

如何启用LRU策略:

可以通过配置文件或命令行启动参数来设置Redis的LRU策略。例如:

maxmemory 100mb
maxmemory-policy allkeys-lru

这样,当Redis达到100MB内存上限时,它会使用allkeys-lru策略来淘汰最近最少使用的键。

Redis的LRU策略通过这种简单而高效的方式,确保了在内存受限的情况下,系统能够继续稳定地提供服务,并且在实际生产中广泛应用于各种缓存场景。

基于LRU的缓存库

  1. groupcache (12.9k star)
    GitHub: groupcache
    这是由Go团队创建的分布式缓存库,适合处理大规模的缓存需求,内部使用了LRU缓存策略。

  2. gcache
    GitHub: gcache
    这是一个支持多种缓存策略的库,包括LRU、LFU、ARC等,且支持过期时间设置,非常适合用于学习LRU实现。

  3. bigcache
    GitHub: bigcache
    适用于大数据量缓存场景,它的设计侧重于高效和低延迟,不过它不使用LRU算法,而是基于时间窗口淘汰策略,你可以通过对比了解不同的缓存策略。

  4. go-cache(8.1k star)
    GitHub: go-cache
    这是一个非常简单易用的内存缓存库,支持到期时间和自动过期清理。虽然它不是严格的LRU,但它可以作为一个基础的缓存库来理解LRU的思想。

  5. hashicorp/golang-lru(4.2k star)
    GitHub: golang-lru
    HashiCorp 提供的 golang-lru 是一个非常流行的 LRU 缓存库,具有良好的性能和易用性。

  6. dgraph-io/ristretto
    GitHub: ristretto
    ristretto 是 Dgraph 提供的一个高性能缓存库,支持 LRU 和 LFU(Least Frequently Used)策略。

  7. bluele/gcache
    GitHub: gcache
    gcache 是一个功能丰富的缓存库,支持多种缓存策略,包括 LRU、LFU 和 ARC(Adaptive Replacement Cache)。

go-cache

go-cache is an in-memory key:value store/cache similar to memcached that is suitable for applications running on a single machine. Its major advantage is that, being essentially a thread-safe map[string]interface{} with expiration times, it doesn’t need to serialize or transmit its contents over the network.

Any object can be stored, for a given duration or forever, and the cache can be safely used by multiple goroutines.

Although go-cache isn’t meant to be used as a persistent datastore, the entire cache can be saved to and loaded from a file (using c.Items() to retrieve the items map to serialize, and NewFrom() to create a cache from a deserialized one) to recover from downtime quickly. (See the docs for NewFrom() for caveats.)

Go-cache 是一个类似于 memcached 的内存键值存储/缓存,适用于运行在单机上的应用程序。它的主要优势在于,它本质上是一个支持线程安全的 map[string]interface{},并具有过期时间,因此不需要序列化或通过网络传输其内容。

任何对象都可以存储,可以设置存储时长或永久存储,并且缓存可以被多个 goroutine 安全使用。

尽管 go-cache 并非设计为持久化数据存储,但整个缓存可以保存到文件中或从文件中加载(使用 c.Items() 检索要序列化的项映射,并使用 NewFrom() 从反序列化的缓存创建缓存),以便快速从停机状态恢复。(有关 NewFrom() 的注意事项,请参见文档。)

安装使用

Installation

go get github.com/patrickmn/go-cache

Usage

import (
	"fmt"
	"github.com/patrickmn/go-cache"
	"time"
)

func main() {
	// Create a cache with a default expiration time of 5 minutes, and which
	// purges expired items every 10 minutes
	c := cache.New(5*time.Minute, 10*time.Minute)

	// Set the value of the key "foo" to "bar", with the default expiration time
	c.Set("foo", "bar", cache.DefaultExpiration)

	// Set the value of the key "baz" to 42, with no expiration time
	// (the item won't be removed until it is re-set, or removed using
	// c.Delete("baz")
	c.Set("baz", 42, cache.NoExpiration)

	// Get the string associated with the key "foo" from the cache
	foo, found := c.Get("foo")
	if found {
		fmt.Println(foo)
	}

	// Since Go is statically typed, and cache values can be anything, type
	// assertion is needed when values are being passed to functions that don't
	// take arbitrary types, (i.e. interface{}). The simplest way to do this for
	// values which will only be used once--e.g. for passing to another
	// function--is:
	foo, found := c.Get("foo")
	if found {
		MyFunction(foo.(string))
	}

	// This gets tedious if the value is used several times in the same function.
	// You might do either of the following instead:
	if x, found := c.Get("foo"); found {
		foo := x.(string)
		// ...
	}
	// or
	var foo string
	if x, found := c.Get("foo"); found {
		foo = x.(string)
	}
	// ...
	// foo can then be passed around freely as a string

	// Want performance? Store pointers!
	c.Set("foo", &MyStruct, cache.DefaultExpiration)
	if x, found := c.Get("foo"); found {
		foo := x.(*MyStruct)
			// ...
	}
}

代码解析

go-cache源码只有两个go文件:
在这里插入图片描述
cache核心结构:

// 值结构
type Item struct {
	Object     interface{}
	Expiration int64
}

// 缓存核心结构
type cache struct {
    // 默认超时时间
	defaultExpiration time.Duration
	// 内存缓存
	items             map[string]Item
	// 读写锁
	mu                sync.RWMutex
	// 删除回调函数
	onEvicted         func(string, interface{})
	//
	janitor           *janitor
}

首先来看入口的New函数,New函数内初始化了一个缓存存储items,并且传递给newCacheWithJanitor(带有GC的Cache),并且在newCache中设置默认超时时间,如果为0则不过期,最后如果GC间隔大于0,则启动GC并且向runtime注册一个类似析构函数的handler用于停止GC协程。

func New(defaultExpiration, cleanupInterval time.Duration) *Cache {
	items := make(map[string]Item)
	// 传递超时时间,GC间隔...
	return newCacheWithJanitor(defaultExpiration, cleanupInterval, items)
}

func newCacheWithJanitor(de time.Duration, ci time.Duration, m map[string]Item) *Cache {
	c := newCache(de, m)
	C := &Cache{c}
	if ci > 0 {
		runJanitor(c, ci)
		runtime.SetFinalizer(C, stopJanitor)
	}
	return C
}

func newCache(de time.Duration, m map[string]Item) *cache {
	if de == 0 {
		de = -1
	}
	c := &cache{
		defaultExpiration: de,
		items:             m,
	}
	return c
}

跳转到GC协程,runJanitor启动一个GC协程,GC协程间隔触发GC过期检查并且删除过期的Key,当接收到stop信号后,关闭计时器,退出GC协程。

func (j *janitor) Run(c *cache) {
	ticker := time.NewTicker(j.Interval)
	for {
		select {
		case <-ticker.C:
			c.DeleteExpired()
		case <-j.stop:
			ticker.Stop()
			return
		}
	}
}

func stopJanitor(c *Cache) {
	c.janitor.stop <- true
}

// GC入口
func runJanitor(c *cache, ci time.Duration) {
	j := &janitor{
		Interval: ci,
		stop:     make(chan bool),
	}
	c.janitor = j
	go j.Run(c)
}

然后就是key-val的设置,key为string类型,val支持任意类型,并且允许用户自定义超时时间,通过写锁来保证协程安全数据一致,并且Set会覆盖历史数据,而Add操作会先判断是否已经存在该Key,

func (c *cache) Set(k string, x interface{}, d time.Duration) {
	// "Inlining" of set
	var e int64
	if d == DefaultExpiration {
		d = c.defaultExpiration
	}
	if d > 0 {
		e = time.Now().Add(d).UnixNano()
	}
	// 加写锁保证协程安全
	c.mu.Lock()
	c.items[k] = Item{
		Object:     x,
		Expiration: e,
	}
	// go-cache作者无法容忍defer的栈操作带来的ns级的开销
	// 因此不适用defer去释放锁
	c.mu.Unlock()
}

func (c *cache) set(k string, x interface{}, d time.Duration) {
	// 与Set完全一致
}

func (c *cache) Add(k string, x interface{}, d time.Duration) error {
	c.mu.Lock()
	_, found := c.get(k)
	if found {
		c.mu.Unlock()
		return fmt.Errorf("Item %s already exists", k)
	}
	c.set(k, x, d)
	c.mu.Unlock()
	return nil
}

接下来是数据获取操作,去items中取值,如果已经被GC掉了,直接返回,否则使用lazy的思想去判断key是否过期,过期则返回空,否则返回值(空接口对象可用于类型断言),

func (c *cache) Get(k string) (interface{}, bool) {
	c.mu.RLock()
	// "Inlining" of get and Expired
	item, found := c.items[k]
	if !found {
		c.mu.RUnlock()
		return nil, false
	}
	if item.Expiration > 0 {
		if time.Now().UnixNano() > item.Expiration {
			c.mu.RUnlock()
			return nil, false
		}
	}
	c.mu.RUnlock()
	return item.Object, true
}

紧接着提供了Incr操作,允许用户对Number泛型的val进行自增操作,同理还提供了一系列派生Incr操作,不再一一赘述,

func (c *cache) Increment(k string, n int64) error {
	c.mu.Lock()
	v, found := c.items[k]
	if !found || v.Expired() {
		c.mu.Unlock()
		return fmt.Errorf("Item %s not found", k)
	}
	switch v.Object.(type) {
	case int:
		v.Object = v.Object.(int) + int(n)
	case int8:
		v.Object = v.Object.(int8) + int8(n)
	case int16:
		v.Object = v.Object.(int16) + int16(n)
	case int32:
		v.Object = v.Object.(int32) + int32(n)
	case int64:
		v.Object = v.Object.(int64) + n
	case uint:
		v.Object = v.Object.(uint) + uint(n)
	case uintptr:
		v.Object = v.Object.(uintptr) + uintptr(n)
	case uint8:
		v.Object = v.Object.(uint8) + uint8(n)
	case uint16:
		v.Object = v.Object.(uint16) + uint16(n)
	case uint32:
		v.Object = v.Object.(uint32) + uint32(n)
	case uint64:
		v.Object = v.Object.(uint64) + uint64(n)
	case float32:
		v.Object = v.Object.(float32) + float32(n)
	case float64:
		v.Object = v.Object.(float64) + float64(n)
	default:
		c.mu.Unlock()
		return fmt.Errorf("The value for %s is not an integer", k)
	}
	c.items[k] = v
	c.mu.Unlock()
	return nil
}

对于key的删除操作,则是加锁采取内置的map delete函数实,如果设置了钩子函数则会调用一次钩子函数:

func (c *cache) Delete(k string) {
	c.mu.Lock()
	v, evicted := c.delete(k)
	c.mu.Unlock()
	if evicted {
		c.onEvicted(k, v)
	}
}

func (c *cache) delete(k string) (interface{}, bool) {
	if c.onEvicted != nil {
		if v, found := c.items[k]; found {
			delete(c.items, k)
			return v.Object, true
		}
	}
	delete(c.items, k)
	return nil, false
}

此外,作者还基于gob(gob即go binary是go开发组提供的一组序列化操作方法集)提供了序列化和反序列化操作,如果对gob陌生,可以理解为用json包将map进行序列化后存储到Writer中

func (c *cache) Save(w io.Writer) (err error) {
	enc := gob.NewEncoder(w)
	defer func() {
		if x := recover(); x != nil {
			err = fmt.Errorf("Error registering item types with Gob library")
		}
	}()
	c.mu.RLock()
	defer c.mu.RUnlock()
	for _, v := range c.items {
		gob.Register(v.Object)
	}
	// map序列化
	err = enc.Encode(&c.items)
	return
}

最后来看一下GC协程,当触发定时器时,遍历全部key-val一一判断哪些key过期,然后统一收集钩子函数,最后执行,

func (c *cache) DeleteExpired() {
	var evictedItems []keyAndValue
	now := time.Now().UnixNano()
	c.mu.Lock()
	for k, v := range c.items {
		// "Inlining" of expired
		if v.Expiration > 0 && now > v.Expiration {
			ov, evicted := c.delete(k)
			if evicted {
				evictedItems = append(evictedItems, keyAndValue{k, ov})
			}
		}
	}
	c.mu.Unlock()
	for _, v := range evictedItems {
		c.onEvicted(v.key, v.value)
	}
}

对于普通cache,设计并没有特别惊艳之处,尤其是是GC协程的处理,宛如一记猛棍,加锁遍历全库去删除key,在Set key时因为ns级别的defer开销而弃用defer,在此处则没有去思考系统开销,内存逐出策略采用较简单的ttl策略,适合go语言入门时阅读,并且可以加入该作者的仓库共同开发。

除了标准的cache外,作者提供了一个sharded版本的cache,旨在创建一个具有比标准缓存更好算法复杂度的缓存,主要通过防止在添加项目时对整个缓存进行写锁定实现。

type shardedCache struct {
	seed    uint32
	m       uint32
	cs      []*cache
	janitor *shardedJanitor
}

// djb2 with better shuffling. 5x faster than FNV with the hash.Hash overhead.
func djb33(seed uint32, k string) uint32 {
	var (
	    // 字符串的长度,类型为 uint32。
		l = uint32(len(k))
		// 初始哈希值,使用 5381 + seed + l 进行初始化,基于 djb2 算法的核心思想。
		d = 5381 + seed + l
		// 用于迭代字符串的索引。
		i = uint32(0)
	)
	// Why is all this 5x faster than a for loop?
	// 如果字符串长度大于等于 4,则每次循环处理 4 个字符。
	// 每个字符通过 d * 33 的形式进行混淆,再通过异或操作将字符值混入哈希值。
	if l >= 4 {
		for i < l-4 {
			d = (d * 33) ^ uint32(k[i])
			d = (d * 33) ^ uint32(k[i+1])
			d = (d * 33) ^ uint32(k[i+2])
			d = (d * 33) ^ uint32(k[i+3])
			i += 4
		}
	}
	// 对剩余不足 4 个字符的部分进行处理,使用 switch 来选择如何处理 1 到 4 个字符,确保每个字符都影响到哈希值。
	switch l - i {
	case 1:
	case 2:
		d = (d * 33) ^ uint32(k[i])
	case 3:
		d = (d * 33) ^ uint32(k[i])
		d = (d * 33) ^ uint32(k[i+1])
	case 4:
		d = (d * 33) ^ uint32(k[i])
		d = (d * 33) ^ uint32(k[i+1])
		d = (d * 33) ^ uint32(k[i+2])
	}
	return d ^ (d >> 16)
}

func (sc *shardedCache) bucket(k string) *cache {
	return sc.cs[djb33(sc.seed, k)%sc.m]
}

关于桶的选择,作者引入了djb2算法并且自己做了改良,

djb2 是一种简单而高效的哈希算法,由著名程序员 Daniel J. Bernstein 发明。

它常用于生成字符串的哈希值,因其简洁性和良好的分布性受到广泛使用,尤其是在字符串哈希表中。

djb2 算法的核心是通过将哈希值乘以一个常数(通常为 33)并加上当前字符的 ASCII 值来逐步生成哈希值。

FNV(Fowler-Noll-Vo)是一种著名的哈希算法,最早由 Glenn Fowler、Landon Curt Noll 和 Kiem-Phong Vo 开发,故而得名。FNV 的设计目标是简洁且快速,适用于字符串和其他数据的哈希运算。

FNV 算法的核心思想是将哈希值乘以一个素数(通常是 16777619 或 1099511628211)并对输入数据逐字节进行异或(XOR)操作。

FNV 主要有两个版本:
   FNV-1:每一步先乘素数再异或输入数据。
   FNV-1a:每一步先异或输入数据再乘素数。

FNV 算法因其简单和有效的哈希值分布在许多系统中得到了广泛应用。尤其在哈希表、查找、数据校验等应用中,FNV 被视为一个不错的选择。

FNV 就是一个典型的逐字符哈希算法,因为它每次只处理一个字符或字节,并对哈希值进行更新。

相比之下,某些哈希函数(如你提到的 djb33 优化版)可以通过批量处理多个字符来提高效率,减少循环的次数,这就是为什么它们可能会比逐字符哈希函数更快。

了解完djb2后回到选择bucket的函数,发现存储桶是通过hash值与m进行取模运算得到的:

func (sc *shardedCache) bucket(k string) *cache {
	return sc.cs[djb33(sc.seed, k)%sc.m]
}

但m怎么初始化,作者并没有开发完成,但可以猜测一手,m的值为桶的数量。对于GC部分代码则完全调用标准cache的GC,没有效率提高。

总之,这是一个正在成长中的项目,并且也与LRU相差甚远,ChatGpt推荐的仓库好像也不是十分精准嘛。最后,作者个人卡片写了自己是clovyr公司的CTO…,本人对此知之甚少,结束!
在这里插入图片描述

hashicorp/golang-lru

This provides the lru package which implements a fixed-size thread safe LRU cache. It is based on the cache in Groupcache.

这是一个国人写的库。

安装使用

Install:

go get github.com/hashicorp/golang-lru/v2

Usage:

package main

import (
	"fmt"
	"time"

	"github.com/hashicorp/golang-lru/v2/expirable"
)

func main() {
	// make cache with 10ms TTL and 5 max keys
	cache := expirable.NewLRU[string, string](5, nil, time.Millisecond*10)


	// set value under key1.
	cache.Add("key1", "val1")

	// get value under key1
	r, ok := cache.Get("key1")

	// check for OK value
	if ok {
		fmt.Printf("value before expiration is found: %v, value: %q\n", ok, r)
	}

	// wait for cache to expire
	time.Sleep(time.Millisecond * 12)

	// get value under key1 after key expiration
	r, ok = cache.Get("key1")
	fmt.Printf("value after expiration is found: %v, value: %q\n", ok, r)

	// set value under key2, would evict old entry because it is already expired.
	cache.Add("key2", "val2")

	fmt.Printf("Cache len: %d\n", cache.Len())
	// Output:
	// value before expiration is found: true, value: "val1"
	// value after expiration is found: false, value: ""
	// Cache len: 1
}

代码解析

golang-lru的核心文件在simpleLru,并且在interface声明中,作者引入泛型定义了实现Get,Set等方法的对象均为Cache对象:
在这里插入图片描述
紧接着来看看作者自己是如何实现Cache接口的,可以看到作者也是定义了逐出回调,并且使用典型的map+list来实现LRU算法,其次作者将KV存储于自定义的entry中方便LRU操作,以及通过size限制允许存储的key的个数

// EvictCallback is used to get a callback when a cache entry is evicted
type EvictCallback[K comparable, V any] func(key K, value V)

// LRU implements a non-thread safe fixed size LRU cache
type LRU[K comparable, V any] struct {
	size      int
	evictList *internal.LruList[K, V]
	items     map[K]*internal.Entry[K, V]
	onEvict   EvictCallback[K, V]
}

// NewLRU constructs an LRU of the given size
func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V]) (*LRU[K, V], error) {
	if size <= 0 {
		return nil, errors.New("must provide a positive size")
	}

	c := &LRU[K, V]{
		size:      size,
		evictList: internal.NewList[K, V](),
		items:     make(map[K]*internal.Entry[K, V]),
		onEvict:   onEvict,
	}
	return c, nil
}

步入list源码,感觉和golang内置的list声明无太大差异,只是扩展了len参数用于标识当前list存储的key的个数,

// Entry is an LRU Entry
type Entry[K comparable, V any] struct {
	next, prev *Entry[K, V]

	// The list to which this element belongs.
	list *LruList[K, V]

	// The LRU Key of this element.
	Key K

	// The Value stored with this element.
	Value V

	// The time this element would be cleaned up, optional
	ExpiresAt time.Time

	// The expiry bucket item was put in, optional
	ExpireBucket uint8
}

// PrevEntry returns the previous list element or nil.
func (e *Entry[K, V]) PrevEntry() *Entry[K, V] {
	if p := e.prev; e.list != nil && p != &e.list.root {
		return p
	}
	return nil
}

type LruList[K comparable, V any] struct {
	root Entry[K, V] // sentinel list element, only &root, root.prev, and root.next are used
	len  int         // current list Length excluding (this) sentinel element
}

// Init initializes or clears list l.
func (l *LruList[K, V]) Init() *LruList[K, V] {
	l.root.next = &l.root
	l.root.prev = &l.root
	l.len = 0
	return l
}

回到cache,继续阅读设置/取值操作,可以看到在Add操作时,判断了key是否存在,如果存在则更新值并且将key的频率刷新(对于LRU就是将key移到开头),否则将直接在开头追加一个节点,完成插入操作后,会判断是否超出限制来触发GC(对于LRU GC就是移除末尾节点),

func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
	// Check for existing item
	if ent, ok := c.items[key]; ok {
		c.evictList.MoveToFront(ent)
		ent.Value = value
		return false
	}

	// Add new item
	ent := c.evictList.PushFront(key, value)
	c.items[key] = ent

	evict := c.evictList.Length() > c.size
	// Verify size not exceeded
	if evict {
		c.removeOldest()
	}
	return evict
}

// Get looks up a key's value from the cache.
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
	if ent, ok := c.items[key]; ok {
		c.evictList.MoveToFront(ent)
		return ent.Value, true
	}
	return
}

LRU的核心就是removeOldest函数了,可以看到是一个典型的操作:

// removeOldest removes the oldest item from the cache.
func (c *LRU[K, V]) removeOldest() {
	if ent := c.evictList.Back(); ent != nil {
		c.removeElement(ent)
	}
}

// removeElement is used to remove a given list element from the cache
func (c *LRU[K, V]) removeElement(e *internal.Entry[K, V]) {
	c.evictList.Remove(e)
	delete(c.items, e.Key)
	if c.onEvict != nil {
		c.onEvict(e.Key, e.Value)
	}
}

以上就是simpleLru的全部内容,同时作者还提供了一个带有超时时间的Cache实现和一个更复杂的ARCCache和TwoQueueCache实现。

总而言之,simpleLru的内存逐出过于粗糙,size竟然是key的数量,通过比较list的元素个数和size进行GC,而不是内存的使用状态,如开头所言,过于粗糙。

回到代码最外层,lru.go中引用的也是simpleLru,也就是说,除非手动选择另外两种Cache,否则默认是最差的simpleLru,同时为了保证线程安全,内嵌了一个读写锁。

在这里插入图片描述
代码目录比较混乱,有些cache放到了单独文件夹中,有些cache又单独拿了出来,该项目更像是一个Cache组,提供了多种cache供用户使用,比较有吸引力的是ARCCache和TwoQueueCache,接下来解析这两个Cache:

// ARCCache is a thread-safe fixed size Adaptive Replacement Cache (ARC).
// ARC is an enhancement over the standard LRU cache in that tracks both
// frequency and recency of use. This avoids a burst in access to new
// entries from evicting the frequently used older entries. It adds some
// additional tracking overhead to a standard LRU cache, computationally
// it is roughly 2x the cost, and the extra memory overhead is linear
// with the size of the cache. ARC has been patented by IBM, but is
// similar to the TwoQueueCache (2Q) which requires setting parameters.
type ARCCache[K comparable, V any] struct {
	size int // Size is the total capacity of the cache
	p    int // P is the dynamic preference towards T1 or T2

	t1 simplelru.LRUCache[K, V]        // T1 是用于最近访问条目的 LRU 缓存
	b1 simplelru.LRUCache[K, struct{}] // B1 是从 t1 驱逐条目的 LRU 缓存

	t2 simplelru.LRUCache[K, V]        // T2 是用于频繁访问条目的 LRU 缓存
	b2 simplelru.LRUCache[K, struct{}] // B2 是从 t2 驱逐条目的 LRU 缓存

	lock sync.RWMutex
}

ARCCache 是一个线程安全的固定大小的自适应替换缓存(ARC)是标准 LRU 缓存的改进版,它同时跟踪使用频率和最近使用情况。 这可以避免由于对新条目的突发访问而驱逐掉常用的旧条目。 相比标准 LRU 缓存,它增加了一些额外的跟踪开销,计算成本大约是标准 LRU 缓存的两倍,额外的内存开销与缓存大小成线性关系。

根据ARC结构体的描述,感觉和sync.Map一样做了读写分离,以减少读写之间加锁的开销,并且用读写锁保证某些情况下的数据一致性和并发安全。

接着往下看,NewARC中使用size去初始化simpleLru的长度,以及设置最大key的数量,

func NewARC[K comparable, V any](size int) (*ARCCache[K, V], error) {
	// Create the sub LRUs
	b1, err := simplelru.NewLRU[K, struct{}](size, nil)
	if err != nil {
		return nil, err
	}
	b2, err := simplelru.NewLRU[K, struct{}](size, nil)
	if err != nil {
		return nil, err
	}
	t1, err := simplelru.NewLRU[K, V](size, nil)
	if err != nil {
		return nil, err
	}
	t2, err := simplelru.NewLRU[K, V](size, nil)
	if err != nil {
		return nil, err
	}

	// Initialize the ARC
	c := &ARCCache[K, V]{
		size: size,
		p:    0,
		t1:   t1,
		b1:   b1,
		t2:   t2,
		b2:   b2,
	}
	return c, nil
}

关键操作在于Add,首先由于涉及多个写操作,干脆用一个写锁把整个方法括起来,Contains方法只是去simpleLru的map中取值,如果存在当前key,则认为是频繁访问的key,从t1有移动到t2中…

// Add adds a value to the cache.
func (c *ARCCache[K, V]) Add(key K, value V) {
	c.lock.Lock()
	defer c.lock.Unlock()

	// Check if the value is contained in T1 (recent), and potentially
	// promote it to frequent T2
	// 如果t1存在当前key,则认为是频繁访问的key,从t1移动到t2中
	if c.t1.Contains(key) {
		c.t1.Remove(key)
		c.t2.Add(key, value)
		return
	}

	// Check if the value is already in T2 (frequent) and update it
	// t1不存在但t2存在key,则将新值覆盖到t2
	if c.t2.Contains(key) {
		c.t2.Add(key, value)
		return
	}

	// Check if this value was recently evicted as part of the
	// recently used list
	// 检查该值是否最近被从最近使用列表中驱逐
	// 如果当前key在t1的待删除cache中则扩容并且将key移动到常用队列中
	if c.b1.Contains(key) {
		// T1 set is too small, increase P appropriately
		delta := 1
		b1Len := c.b1.Len()
		b2Len := c.b2.Len()
		if b2Len > b1Len {
			delta = b2Len / b1Len
		}
		if c.p+delta >= c.size {
			c.p = c.size
		} else {
			c.p += delta
		}

		// Potentially need to make room in the cache
		if c.t1.Len()+c.t2.Len() >= c.size {
			c.replace(false)
		}

		// Remove from B1
		c.b1.Remove(key)

		// Add the key to the frequently used list
		c.t2.Add(key, value)
		return
	}

	// Check if this value was recently evicted as part of the
	// frequently used list
	if c.b2.Contains(key) {
		// T2 set is too small, decrease P appropriately
		delta := 1
		b1Len := c.b1.Len()
		b2Len := c.b2.Len()
		if b1Len > b2Len {
			delta = b1Len / b2Len
		}
		if delta >= c.p {
			c.p = 0
		} else {
			c.p -= delta
		}

		// Potentially need to make room in the cache
		// 如果总长度大于上线,需要执行GC
		if c.t1.Len()+c.t2.Len() >= c.size {
			c.replace(true)
		}

		// Remove from B2
		c.b2.Remove(key)

		// Add the key to the frequently used list
		c.t2.Add(key, value)
		return
	}

	// Potentially need to make room in the cache
	if c.t1.Len()+c.t2.Len() >= c.size {
		c.replace(false)
	}

	// Keep the size of the ghost buffers trim
	if c.b1.Len() > c.size-c.p {
		c.b1.RemoveOldest()
	}
	if c.b2.Len() > c.p {
		c.b2.RemoveOldest()
	}

	// Add to the recently seen list
	c.t1.Add(key, value)
}

综上,首先再次强调下T1,T2,B1,B2的作用:

  • T1存储“最近访问但不常用”的数据。
  • T2存储“最近频繁访问”的数据。
  • B1和B2分别用于记录从T1和T2中被移除的键(称为幽灵缓存)。

当缓存已满或接近满时,ARC根据访问模式来调整T1和T2的大小比例p。扩容其实是调整缓存的权重:

  • B1命中时(即某个值被T1驱逐后再次被访问):意味着曾经被认为不常用的值现在变得重要,所以增加p,让T1的空间变大。(因为会根据p删除key,在size不变的情况下,t1删除的少,就代表t1有更多内容,t2同理)
  • B2命中时(即某个值被T2驱逐后再次被访问):意味着经常访问的内容可能不再重要,所以减少p,让T2的空间变小。

因此,当Add检测到某个键最近被驱逐(在B1或B2中找到)时,ARC根据情况调整p的值。这个“扩容”其实是重新分配T1和T2的空间,确保缓存能动态适应数据的访问模式。

一句话:扩容是为了根据数据的访问频率来动态调整T1和T2的容量,从而提高缓存的命中率和性能。

关于p字段的使用,需要跳到replace方法中:

func (c *ARCCache[K, V]) replace(b2ContainsKey bool) {
	t1Len := c.t1.Len()
	if t1Len > 0 && (t1Len > c.p || (t1Len == c.p && b2ContainsKey)) {
		k, _, ok := c.t1.RemoveOldest()
		if ok {
			c.b1.Add(k, struct{}{})
		}
	} else {
		k, _, ok := c.t2.RemoveOldest()
		if ok {
			c.b2.Add(k, struct{}{})
		}
	}
}

在 Add 方法中,每当需要在缓存中腾出空间时,都会调用 replace 方法。replace 方法的行为取决于 p 的值,这个方法决定了是从 T1(最近访问队列)中移除数据,还是从 T2(频繁访问队列)中移除数据:

  • 当 T1 的大小超过 p:意味着我们认为最近访问的数据占用了太多缓存,因此需要从 T1 中移除一些数据来为频繁使用的数据腾出空间。
  • 当 T1 的大小小于等于 p:这时频繁使用的数据占据了更多的空间,若需要腾出空间,则会从 T2 中移除最老的元素。

此外还需要更新 B1 或 B2:

  • 当从 T1 中移除数据时,将被移除的键添加到 B1(幽灵缓存,记录被从 T1 中移除的键)。
  • 当从 T2 中移除数据时,将被移除的键添加到 B2(同样作为幽灵缓存,记录从 T2 中移除的键)。

回到命中B2 命中时的逻辑,

delta := 1
b1Len := c.b1.Len()
b2Len := c.b2.Len()
if b1Len > b2Len {
	delta = b1Len / b2Len
}
if delta >= c.p {
	c.p = 0
} else {
	c.p -= delta
}

它的目的是通过减少 p 的值来缩小 T1(最近访问的数据)的空间,扩大 T2(频繁访问的数据)的空间。这是因为 B2 中的键表示某些数据曾经是频繁访问的,但由于空间限制被移除了;当它再次被访问时,ARC 认为频繁访问的数据可能更重要,所以需要减少 T1 的空间来扩大 T2。

这段代码是处理 B2 命中时的逻辑。它的目的是通过减少 p 的值来缩小 T1(最近访问的数据)的空间,扩大 T2(频繁访问的数据)的空间。这是因为 B2 中的键表示某些数据曾经是频繁访问的,但由于空间限制被移除了;当它再次被访问时,ARC 认为频繁访问的数据可能更重要,所以需要减少 T1 的空间来扩大 T2

逻辑分析:

  1. 初始 delta 为 1

    • 这是一个基础值,用于在没有其他信息时,减少 p 时的最低变化量。
  2. b1Lenb2Len 的比较

    • b1Len 是幽灵缓存 B1 的大小,表示最近使用的数据被驱逐后的缓存数。
    • b2Len 是幽灵缓存 B2 的大小,表示频繁使用的数据被驱逐后的缓存数。
    • 如果 b1Len 大于 b2Len,说明最近使用的数据被驱逐得更多,那么算法会进一步减少 p,因为频繁使用的数据(即 T2)相对更少,可能需要更多空间。
  3. 调整 delta

    • 如果 B1 中的元素比 B2 更多,说明最近使用的数据被移除的频率更高,这时候 delta 被设置为 b1Len / b2Len,目的是让 p 减少得更快,减少 T1 的空间,给 T2(频繁访问的数据)留更多空间。
  4. 调整 p

    • 如果计算得到的 delta 大于或等于当前的 p,则将 p 设置为 0,即完全移除 T1 的空间。这是极端情况下,频繁访问的数据占据了缓存的主导位置,导致几乎没有空间留给最近访问的数据。
    • 如果 delta 小于 p,则正常减少 p,让 T1 缩小,同时给 T2 增加更多空间。

为什么要减少T1 的空间呢? 不能值扩大T2吗?

减少 T1 的空间而不是直接扩大 T2 的原因是因为 ARC 算法的设计理念在于保持缓存总容量固定,同时动态调整 T1 和 T2
的大小比例,以适应不同的访问模式。

再来看TwoQueueCache的实现:

Two-Queue 缓存算法,它是对标准 LRU(Least Recently Used,最近最少使用)算法的改进。2Q 算法通过将缓存分为两个队列来管理最近使用和频繁使用的条目,以避免标准 LRU 的某些缺陷。

2Q 算法的核心思想:

  • 最近使用队列(A1):用来保存刚刚访问过的条目。这个队列的作用是跟踪最近使用但还没有频繁使用的条目。这个队列帮助缓解短期访问突增(例如缓存“风暴”)对缓存整体性能的影响。
  • 频繁使用队列(A2):这个队列保存那些频繁被访问的条目。只有当某个条目多次被访问时,它才会被提升到这个队列中。通过这种方式,2Q 能够更好地保留频繁使用的数据。

改进之处:2Q 算法的改进之处在于通过分离最近使用和频繁使用的条目,有效防止短期内大量新访问的条目迅速占据缓存空间,导致那些重要的频繁访问条目被驱逐的问题。

type TwoQueueCache[K comparable, V any] struct {
	size        int
	recentSize  int
	recentRatio float64
	ghostRatio  float64
    
    // 最近使用队列(A1)
	recent      simplelru.LRUCache[K, V]
	// 频繁使用队列(A2)
	frequent    simplelru.LRUCache[K, V]
	// 最近驱逐队列(B1)
	recentEvict simplelru.LRUCache[K, struct{}]
	// 读写锁,用于在多线程环境中保护缓存的一致性。
	lock        sync.RWMutex
}

同时定义了两个比率:

const (
	// Default2QRecentRatio 是 2Q 缓存中专门用于最近添加的、只被访问过一次的条目的比例。
	Default2QRecentRatio = 0.25

	// Default2QGhostEntries 是用于跟踪最近被驱逐条目的幽灵条目的默认比例。
	Default2QGhostEntries = 0.50
)

然后用size初始化双Q,用ghostSize初始化淘汰队列:

func New2QParams[K comparable, V any](size int, recentRatio, ghostRatio float64) (*TwoQueueCache[K, V], error) {
	...
	// Determine the sub-sizes
	recentSize := int(float64(size) * recentRatio)
	evictSize := int(float64(size) * ghostRatio)

	// Allocate the LRUs
	recent, err := simplelru.NewLRU[K, V](size, nil)
	if err != nil {
		return nil, err
	}
	frequent, err := simplelru.NewLRU[K, V](size, nil)
	if err != nil {
		return nil, err
	}
	recentEvict, err := simplelru.NewLRU[K, struct{}](evictSize, nil)
	if err != nil {
		return nil, err
	}
	...
}

接着来看一下2Q cache的Add设计思想:

func (c *TwoQueueCache[K, V]) Add(key K, value V) {
	c.lock.Lock()
	defer c.lock.Unlock()

	// Check if the value is frequently used already,
	// and just update the value
	// 如果在常用Q中就更新key的值
	if c.frequent.Contains(key) {
		c.frequent.Add(key, value)
		return
	}

	// Check if the value is recently used, and promote
	// the value into the frequent list
	// 如果在最近使用Q中就移动到常用Q
	if c.recent.Contains(key) {
		c.recent.Remove(key)
		c.frequent.Add(key, value)
		return
	}

	// If the value was recently evicted, add it to the
	// frequently used list
	// 如果在淘汰队列中则调用ensureSpace,并且将key-val添加到常用Q
	if c.recentEvict.Contains(key) {
		c.ensureSpace(true)
		c.recentEvict.Remove(key)
		c.frequent.Add(key, value)
		return
	}

	// Add to the recently seen list
	c.ensureSpace(false)
	// 完全新的key,第一次添加到最近使用Q中
	c.recent.Add(key, value)
}

总之,2Q的Add操作非常常规,然后再来看一下2Q中确保空间足够使用的方法:

func (c *TwoQueueCache[K, V]) ensureSpace(recentEvict bool) {
	// If we have space, nothing to do
	recentLen := c.recent.Len()
	freqLen := c.frequent.Len()
	// 空间足够什么都不做
	if recentLen+freqLen < c.size {
		return
	}

	// If the recent buffer is larger than
	// the target, evict from there
	// 当最近使用Q空间不足时,将key移动到淘汰Q中
	if recentLen > 0 && (recentLen > c.recentSize || (recentLen == c.recentSize && !recentEvict)) {
		k, _, _ := c.recent.RemoveOldest()
		c.recentEvict.Add(k, struct{}{})
		return
	}
    // 如果recentQ足够大小
	// Remove from the frequent list otherwise
	c.frequent.RemoveOldest()
}

总体是非常常规的,最后调用c.frequent.RemoveOldest()是因为当 recent 队列不需要驱逐条目时,可能因为缓存总容量已经超出限制,需要进一步从 frequent 队列中驱逐条目。

接着是Get方法,先从常用Q中判断是否存在key,然后再去recent Q 中判断,c.recent.Peek(key) 注意Peek操作,它和Get不同之处在于,Get会触发LRU的list移动节点操作,而Peek则不会。

func (c *TwoQueueCache[K, V]) Get(key K) (value V, ok bool) {
	c.lock.Lock()
	defer c.lock.Unlock()

	// Check if this is a frequent value
	if val, ok := c.frequent.Get(key); ok {
		return val, ok
	}

	// If the value is contained in recent, then we
	// promote it to frequent
	if val, ok := c.recent.Peek(key); ok {
		c.recent.Remove(key)
		c.frequent.Add(key, val)
		return val, ok
	}

	// No hit
	return
}

// simple Lru的peek
func (c *LRU[K, V]) Peek(key K) (value V, ok bool) {
	var ent *internal.Entry[K, V]
	if ent, ok = c.items[key]; ok {
		return ent.Value, true
	}
	return
}
// simple Lru的Get
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
	if ent, ok := c.items[key]; ok {
		c.evictList.MoveToFront(ent)
		return ent.Value, true
	}
	return
}

最后还有一个Resize方法,全局搜索只有在测试文件里使用过,既然是基于用户传入的size去做限制,如果改变了size,就与设计相悖了。

func (c *TwoQueueCache[K, V]) Resize(size int) (evicted int) {
	c.lock.Lock()
	defer c.lock.Unlock()

	// Recalculate the sub-sizes
	recentSize := int(float64(size) * c.recentRatio)
	evictSize := int(float64(size) * c.ghostRatio)
	c.size = size
	c.recentSize = recentSize

	// ensureSpace
	diff := c.recent.Len() + c.frequent.Len() - size
	if diff < 0 {
		diff = 0
	}
	for i := 0; i < diff; i++ {
		c.ensureSpace(true)
	}

	// Reallocate the LRUs
	c.recent.Resize(size)
	c.frequent.Resize(size)
	c.recentEvict.Resize(evictSize)

	return diff
}

groupcache

groupcache is a caching and cache-filling library, intended as a replacement for memcached in many cases.

groupcache旨在替换memcached,groupcache 的设计初衷是作为一种分布式缓存解决方案,特别适用于避免使用外部缓存服务的情况,例如Memcached或Redis。

十分有趣的一点是:does not require running a separate set of servers, thus massively reducing deployment/configuration pain. groupcache is a client library as well as a server. It connects to its own peers, forming a distributed cache.

就是与Redis等其他常用cache实现不同,groupcache并不运行在单独的server上,而是作为library和app运行在同一进程中。所以groupcache既是server也是client。

安装使用

Install:

go get github.com/golang/groupcache

Usage:

package main

import (
    "fmt"
    "log"
    "net/http"

    "github.com/golang/groupcache"
)

func main() {
    // 定义一个名为 "example" 的缓存组,最大缓存容量为 64MB
    var cacheGroup = groupcache.NewGroup("example", 64<<20, groupcache.GetterFunc(
        func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
            // 模拟缓存失效时的数据获取逻辑
            value := "Value for " + key
            dest.SetString(value)
            return nil
        }))

    http.HandleFunc("/cache/", func(w http.ResponseWriter, r *http.Request) {
        key := r.URL.Path[len("/cache/"):]
        var data string
        err := cacheGroup.Get(nil, key, groupcache.StringSink(&data))
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        fmt.Fprintf(w, "Cached value for %s: %s\n", key, data)
    })

    log.Println("Starting server on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

将以上代码保存为 main.go,然后运行:

go run main.go

服务会启动在 localhost:8080。可以通过浏览器或命令行访问:

curl http://localhost:8080/cache/testkey

这将返回类似于以下内容:

Cached value for testkey: Value for testkey

第一次访问时,groupcache 会通过 GetterFunc 填充缓存。随后的访问则会直接从缓存中返回该值。

分布式部署:要在多个节点之间使用 groupcache 进行分布式缓存,只需要在启动时指定每个节点的地址,并让它们相互连接。例如,可以使用 groupcache.NewHTTPPool() 来设置每个节点的地址。

peers := groupcache.NewHTTPPool("http://localhost:8080")
peers.Set("http://localhost:8081", "http://localhost:8082")

代码解析

缓存淘汰算法

FIFO(First In First Out)
先进先出,也就是淘汰缓存中最老(最早添加)的记录。FIFO 认为,最早添加的记录,其不再被使用的可能性比刚添加的可能性大。这种算法的实现也非常简单,创建一个队列,新增记录添加到队尾,每次内存不够时,淘汰队首。但是很多场景下,部分记录虽然是最早添加但也最常被访问,而不得不因为呆的时间太长而被淘汰。这类数据会被频繁地添加进缓存,又被淘汰出去,导致缓存命中率降低。

LFU(Least Frequently Used)
最少使用,也就是淘汰缓存中访问频率最低的记录。LFU 认为,如果数据过去被访问多次,那么将来被访问的频率也更高。LFU 的实现需要维护一个按照访问次数排序的队列,每次访问,访问次数加1,队列重新排序,淘汰时选择访问次数最少的即可。LFU 算法的命中率是比较高的,但缺点也非常明显,维护每个记录的访问次数,对内存的消耗是很高的;另外,如果数据的访问模式发生变化,LFU 需要较长的时间去适应,也就是说 LFU 算法受历史数据的影响比较大。例如某个数据历史上访问次数奇高,但在某个时间点之后几乎不再被访问,但因为历史访问次数过高,而迟迟不能被淘汰。

LRU(Least Recently Used)
最近最少使用,相对于仅考虑时间因素的 FIFO 和仅考虑访问频率的 LFU,LRU 算法可以认为是相对平衡的一种淘汰算法。LRU 认为,如果数据最近被访问过,那么将来被访问的概率也会更高。LRU 算法的实现非常简单,维护一个队列,如果某条记录被访问了,则移动到队尾,那么队首则是最近最少访问的数据,淘汰该条记录即可。

在这里插入图片描述
这张图很好地表示了 LRU 算法最核心的 2 个数据结构

  • 绿色的是字典(map),存储键和值的映射关系。这样根据某个键(key)查找对应的值(value)的复杂是O(1),在字典中插入一条记录的复杂度也是O(1)。
  • 红色的是双向链表(double linked list)实现的队列。将所有的值放到双向链表中,这样,当访问到某个值时,将其移动到队尾的复杂度是O(1),在队尾新增一条记录以及删除一条记录的复杂度均为O(1)。

首先来看groupcache的lru实现,可以看到groupcache使用了go原生的list来实现Lru(没有浪费时间去重新实现),而key使用空接口代表能传入任意类型(任何可比较类型):

type Key interface{}

type entry struct {
	key   Key
	value interface{}
}

type Cache struct {
	// MaxEntries is the maximum number of cache entries before
	// an item is evicted. Zero means no limit.
	MaxEntries int

	// OnEvicted optionally specifies a callback function to be
	// executed when an entry is purged from the cache.
	OnEvicted func(key Key, value interface{})

	ll    *list.List
	cache map[interface{}]*list.Element
}

func New(maxEntries int) *Cache {
	return &Cache{
		MaxEntries: maxEntries,
		ll:         list.New(),
		cache:      make(map[interface{}]*list.Element),
	}
}

Get操作也十分的中规中矩,如果cache没有初始化则懒加载,同时在新增key的时候判断是否存在,移动节点,以及内存淘汰:

func (c *Cache) Add(key Key, value interface{}) {
	if c.cache == nil {
		c.cache = make(map[interface{}]*list.Element)
		c.ll = list.New()
	}
	if ee, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ee)
		ee.Value.(*entry).value = value
		return
	}
	ele := c.ll.PushFront(&entry{key, value})
	c.cache[key] = ele
	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
		c.RemoveOldest()
	}
}

淘汰机制也非常常规:

// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
	if c.cache == nil {
		return
	}
	ele := c.ll.Back()
	if ele != nil {
		c.removeElement(ele)
	}
}

func (c *Cache) removeElement(e *list.Element) {
	c.ll.Remove(e)
	kv := e.Value.(*entry)
	delete(c.cache, kv.key)
	if c.OnEvicted != nil {
		c.OnEvicted(kv.key, kv.value)
	}
}

从以上代码可以看出,Cache结构是并发不安全的,欲知groupcache如何保证并发安全,请看下文!

并发缓存组

继续来阅读groupcache.go的代码,cache结构对lru.Cache又做了一层封装,并且加了一个读写锁和3个n标识,同时引入了基于内存占用的垃圾回收:

type cache struct {
	mu         sync.RWMutex
	nbytes     int64 // of all keys and values
	lru        *lru.Cache
	nhit, nget int64
	nevict     int64 // number of evictions
}

紧接着对lru.Cache操作进行了二次封装,

func (c *cache) add(key string, value ByteView) {
    // add设计对map的并发读写不加锁会导致panic
	c.mu.Lock()
	defer c.mu.Unlock()
	// 懒加载
	if c.lru == nil {
		c.lru = &lru.Cache{
			OnEvicted: func(key lru.Key, value interface{}) {
			    // 注册回调函数
				val := value.(ByteView)
				// 释放内存
				c.nbytes -= int64(len(key.(string))) + int64(val.Len())
				// 统计释放的key的个数
				c.nevict++
			},
		}
	}
	// 加入key
	c.lru.Add(key, value)
	// 增加内存消耗计数
	c.nbytes += int64(len(key)) + int64(value.Len())
}

func (c *cache) get(key string) (value ByteView, ok bool) {
    // 为什么读操作还要加锁, 因为还涉及两个标识的自增
	c.mu.Lock()
	defer c.mu.Unlock()
	// get次数+1
	c.nget++
	if c.lru == nil {
		return
	}
	vi, ok := c.lru.Get(key)
	if !ok {
		return
	}
	// 命中次数+1
	c.nhit++
	return vi.(ByteView), true
}

func (c *cache) removeOldest() {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.lru != nil {
		c.lru.RemoveOldest()
	}
}

func (c *cache) bytes() int64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.nbytes
}

func (c *cache) items() int64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.itemsLocked()
}

func (c *cache) itemsLocked() int64 {
	if c.lru == nil {
		return 0
	}
	// 返回list的元素个数
	return int64(c.lru.Len())
}

同时声明一个状态结构,将cache的私有变量对外服务出去:

type CacheStats struct {
	Bytes     int64
	Items     int64
	Gets      int64
	Hits      int64
	Evictions int64
}

func (c *cache) stats() CacheStats {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return CacheStats{
		Bytes:     c.nbytes,
		Items:     c.itemsLocked(),
		Gets:      c.nget,
		Hits:      c.nhit,
		Evictions: c.nevict,
	}
}

同时为了计算key-val结构占用的内存以及存储任意类型数据(包括图片,视频等数据),抽象了一层只读的ByteView出来:

// ByteView 持有一个不可变的字节视图。
// 在内部,它封装了一个 []byte 或一个 string,
// 但这个细节对调用者是不可见的。
type ByteView struct {
	// If b is non-nil, b is used, else s is used.
	b []byte
	s string
}

// Len returns the view's length.
func (v ByteView) Len() int {
	if v.b != nil {
		return len(v.b)
	}
	return len(v.s)
}

cache也有了,为了提高存储能力和分级缓存机制,于是将cache放到了一个Group中:

type Group struct {
	name       string
	getter     Getter
	peersOnce  sync.Once
	peers      PeerPicker
	cacheBytes int64 // limit for sum of mainCache and hotCache size
	
	// 主存(高可信)
	// mainCache is a cache of the keys for which this process
	// (amongst its peers) is authoritative. That is, this cache
	// contains keys which consistent hash on to this process's
	// peer number.
	mainCache cache
    // 高热存储
	hotCache cache

	// loadGroup ensures that each key is only fetched once
	// (either locally or remotely), regardless of the number of
	// concurrent callers.
	// loadGroup 确保每个键只被获取一次(无论是本地还是远程),
    // 不论有多少并发调用者。
	loadGroup flightGroup
	
	// 方便内存对齐
	_ int32 // force Stats to be 8-byte aligned on 32-bit platforms

	// Stats are statistics on the group.
	Stats Stats
}

Group涉及到两个关键的知识点,一致性哈希flightGroup.

一致性哈希

什么是一致性哈希?

一致性哈希算法是Cache 从单节点走向分布式节点的一个重要的环节。

对于分布式缓存来说,当一个节点接收到请求,如果该节点并没有存储缓存值,那么它面临的难题是,从谁那获取数据?自己,还是节点1, 2, 3, 4… 。假设包括自己在内一共有 10 个节点,当一个节点接收到请求时,随机选择一个节点,由该节点从数据源获取数据。

假设第一次随机选取了节点 1 ,节点 1 从数据源获取到数据的同时缓存该数据;那第二次,只有 1/10 的可能性再次选择节点 1, 有 9/10 的概率选择了其他节点,如果选择了其他节点,就意味着需要再一次从数据源获取数据,一般来说,这个操作是很耗时的。这样做,一是缓存效率低,二是各个节点上存储着相同的数据,浪费了大量的存储空间。

那有什么办法,对于给定的 key,每一次都选择同一个节点呢?使用 hash 算法也能够做到这一点。那把 key 的每一个字符的 ASCII 码加起来,再除以 10 取余数可以吗?当然可以,这可以认为是自定义的 hash 算法。

在这里插入图片描述
从上面的图可以看到,任意一个节点任意时刻请求查找键 Tom 对应的值,都会分配给节点 2,有效地解决了上述的问题。

但如果此时节点数量变了怎么办?简单求取 Hash 值解决了缓存性能的问题,但是没有考虑节点数量变化的场景。假设,移除了其中一台节点,只剩下 9 个,那么之前 hash(key) % 10 变成了 hash(key) % 9,也就意味着几乎缓存值对应的节点都发生了改变。即几乎所有的缓存值都失效了。节点在接收到对应的请求时,均需要重新去数据源获取数据,容易引起 缓存雪崩

缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。
常因为缓存服务器宕机,或缓存设置了相同的过期时间引起。

为了防止Hash函数简陋无法应对节点扩缩容导致的缓存全部失效引发的雪崩,引入了一致性哈希算法:一致性哈希算法(Consistent Hashing)是一种特殊的哈希算法,旨在为分布式系统中负载均衡和数据分布提供高效、均衡的解决方案。它的核心思想是将所有的缓存节点和数据映射到一个虚拟的圆环上,然后通过哈希函数将数据映射到环上的位置,以确定数据应存储在哪个节点上。

一致性哈希算法将 key 映射到 2^32 的空间中,将这个数字首尾相连,形成一个环。

  • 计算节点/机器(通常使用节点的名称、编号和 IP 地址)的哈希值,放置在环上。
  • 计算 key 的哈希值,放置在环上,顺时针寻找到的第一个节点,就是应选取的节点/机器。

在这里插入图片描述
环上有 peer2,peer4,peer6 三个节点,key11,key2,key27 均映射到 peer2,key23 映射到 peer4。此时,如果新增节点/机器 peer8,假设它新增位置如图所示,那么只有 key27 从 peer2 调整到 peer8,其余的映射均没有发生改变。

也就是说,一致性哈希算法,在新增/删除节点时,只需要重新定位该节点附近的一小部分数据,而不需要重新定位所有的节点,这就解决了上述的问题。

同时,如果服务器的节点过少,容易引起 key 的倾斜。例如上面例子中的 peer2,peer4,peer6 分布在环的上半部分,下半部分是空的。那么映射到环下半部分的 key 都会被分配给 peer2,key 过度向 peer2 倾斜,缓存节点间负载不均。

为了解决这个问题,引入了虚拟节点的概念,一个真实节点对应多个虚拟节点。

假设 1 个真实节点对应 3 个虚拟节点,那么 peer1 对应的虚拟节点是 peer1-1、 peer1-2、 peer1-3(通常以添加编号的方式实现),其余节点也以相同的方式操作。

  • 第一步,计算虚拟节点的 Hash 值,放置在环上。
  • 第二步,计算 key 的 Hash 值,在环上顺时针寻找到应选取的虚拟节点,例如是 peer2-1,那么就对应真实节点 peer2。

虚拟节点扩充了节点的数量,解决了节点较少的情况下数据容易倾斜的问题。而且代价非常小,只需要增加一个字典(map)维护真实节点与虚拟节点的映射关系即可。

我们跳转到consistenthash.go文件内看看go官方是怎么实现的,

type Hash func(data []byte) uint32

type Map struct {
    // Hash 函数 hash
	hash     Hash
	// 虚拟节点倍数 replicas
	replicas int
	// 哈希环 keys
	keys     []int // Sorted
	// 虚拟节点与真实节点的映射表 hashMap,键是虚拟节点的哈希值,值是真实节点的名称。
	hashMap  map[int]string
}

PS:不要再问为啥用uint和32位了,如果你确定一个数不可能是负数,为啥要浪费1bit去存储符号? 32位是因为有些系统寻址只支持32bit,go官方写一个开源的缓存,不可能丢弃掉32bit的用户,况且32bit在64bit上也能顺利运行。

在这里插入图片描述
接着往下看,go官方默认会采用src32.ChecksumIEEE作为hash函数,

CRC32(Cyclic Redundancy Check
32-bit)是一种广泛使用的错误检测机制,用于检测数据在存储或传输过程中是否发生了错误。CRC32 使用 32
位的校验和,它通过多项式除法来生成一个 32 位的值,该值可以用来检测数据的完整性。

func New(replicas int, fn Hash) *Map {
	m := &Map{
		replicas: replicas,
		hash:     fn,
		hashMap:  make(map[int]string),
	}
	if m.hash == nil {
		m.hash = crc32.ChecksumIEEE
	}
	return m
}

当key加入缓存时,同时加入到hash环中:

func (m *Map) Add(keys ...string) {
	for _, key := range keys {
		for i := 0; i < m.replicas; i++ {
		    // 计算key+虚节点的hash值
			hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
			// 把key的hash加入到总key中
			m.keys = append(m.keys, hash)
			// 添加hash到key的映射
			m.hashMap[hash] = key
		}
	}
	sort.Ints(m.keys)
}

func (m *Map) Get(key string) string {
	if m.IsEmpty() {
		return ""
	}

	hash := int(m.hash([]byte(key)))

	// Binary search for appropriate replica.
	idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })

	// Means we have cycled back to the first replica.
	if idx == len(m.keys) {
		idx = 0
	}

	return m.hashMap[m.keys[idx]]
}
防止缓存击穿——singleflight 的使用

singleflight 是一种编程模式或技术,用于解决同一时间内对相同资源的重复请求,避免并发调用的抖动或过载问题。该模式确保在并发情况下,相同的请求只会被执行一次,其余的并发请求会等待第一个请求的结果,而不是重复执行相同的操作。

在 Go 语言中,singleflight 是由 golang.org/x/sync/singleflight 包提供的一个实现,用于避免并发的重复工作。

核心思想:

  • 避免重复请求:在并发环境中,多个请求可能同时发起相同的操作,例如从远程服务器获取数据或从数据库中查询结果。singleflight 模式保证了只执行一次相同的请求,后续的请求会等待第一个请求的结果。

  • 返回结果共享:当多个请求同时发生时,singleflight 会将第一个请求的结果共享给其他请求,从而避免不必要的重复计算或 I/O 操作。

singleflight 主要用于以下场景:

  • 缓存填充:防止多个线程同时请求同一个数据时,重复地从数据库或远程服务加载数据。
  • 去抖动:在短时间内防止多个相同的操作(如网络请求或磁盘 I/O)被频繁执行。
  • 减少资源消耗:通过避免重复的工作来减少资源消耗,如 CPU 计算、数据库查询、API 请求等。

小例子:

package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "time"
)

func main() {
    var group singleflight.Group

    // 模拟三个并发请求
    for i := 0; i < 3; i++ {
        go func(i int) {
            result, _, _ := group.Do("key", func() (interface{}, error) {
                fmt.Printf("Request %d: Executing...\n", i)
                time.Sleep(2 * time.Second)  // 模拟长时间的操作
                return "Result", nil
            })
            fmt.Printf("Request %d: Result = %s\n", i, result)
        }(i)
    }

    time.Sleep(3 * time.Second)
}

groupCache在groupcache.go提供了flightGroup接口,并在singleflight给出了实现:

type flightGroup interface {
	// Done is called when Do is done.
	Do(key string, fn func() (interface{}, error)) (interface{}, error)
}
package singleflight

import "sync"

// call 是正在进行或已完成的 Do 调用
type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}

type Group struct {
	mu sync.Mutex       // protects m
	// m是key-call的绑定
	m  map[string]*call // lazily initialized
}

// 如果出现重复请求,重复的调用者会等待原始调用完成,并接收相同的结果。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    // 加锁保护m
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	
	// 判断当前key是不是正在被call
	if c, ok := g.m[key]; ok {
	    // 如果是则释放锁,因为我们不需要改变任何东西
	    // 只需要等待call返回
		g.mu.Unlock()
		// 等待
		c.wg.Wait()
		// 返回key对应的val
		return c.val, c.err
	}
	
	// 当前key第一次被call
	c := new(call)
	// wg的并发g计数器加1
	c.wg.Add(1)
	g.m[key] = c
	// 此时不需要再操作m,所以把锁放掉即可
	g.mu.Unlock()
    
	// 所有协程各自调用自己的fn()
	c.val, c.err = fn()
	// 执行完成就给各自的call wg的并发g计数器-1
	c.wg.Done()

    // 再次加锁删除掉调用的纪录
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()

	return c.val, c.err
}
接口型函数

回到groupcache.go中,发现作者针对取值的行为定义了一个接口型函数,

// A Getter loads data for a key.
type Getter interface {
	// Get returns the value identified by key, populating dest.
	//
	// The returned data must be unversioned. That is, key must
	// uniquely describe the loaded data, without an implicit
	// current time, and without relying on cache expiration
	// mechanisms.
	Get(ctx context.Context, key string, dest Sink) error
}

// A GetterFunc implements Getter with a function.
type GetterFunc func(ctx context.Context, key string, dest Sink) error

func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
	return f(ctx, key, dest)
}

接口型函数在go中被大量用到,再举一个net/http的例子:

type Handler interface {
	ServeHTTP(ResponseWriter, *Request)
}
type HandlerFunc func(ResponseWriter, *Request)

func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
	f(w, r)
}

想象这么一个使用场景,GetData 的作用是从某数据源获取结果,接口类型 Getter 是其中一个参数,代表某数据源,认为只要是实现了Get(string)的就是数据源,

func GetData(getter Getter, key string) []byte {
	buf, err := getter.Get(key)
	if err == nil {
		return buf
	}
	return nil
}

可以有多种方式调用该函数:

方法1:传入普通函数,但用Getter强转:

func test(key string) ([]byte, error) {
	return []byte(key), nil
}

func main() {
    GetFromSource(GetterFunc(test), "hello")
}

方法2:实现了 Getter 接口的结构体作为参数

type DB struct{ url string}

func (db *DB) Query(sql string, args ...string) string {
	// ...
	return "hello"
}

func (db *DB) Get(key string) ([]byte, error) {
	// ...
	v := db.Query("SELECT NAME FROM TABLE WHEN NAME= ?", key)
	return []byte(v), nil
}

func main() {
	GetFromSource(new(DB), "hello")
}

这样,既能够将普通的函数类型(需类型转换)作为参数,也可以将结构体作为参数,使用更为灵活,可读性也更好,这就是接口型函数的价值。如果是普通结构体作为参数,则会直接触发结构体的Get方法,如果是函数作为参数,则可以把函数的返回值作为Get的结果,非常巧妙。

对于net/http包的接口型函数同理,如果传入的是一个实现了 Handler 接口的结构体,就可以完全托管所有的 HTTP 请求,后续怎么路由,怎么处理,请求前后增加什么功能,都可以自定义了。

“Set”/Get方法

回归到groupcache的核心,使用NewGroup初始化得到一个cache,

var (
	mu     sync.RWMutex
	groups = make(map[string]*Group)

	initPeerServerOnce sync.Once
	initPeerServer     func()
)

func callInitPeerServer() {
	if initPeerServer != nil {
		initPeerServer()
	}
}

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
	return newGroup(name, cacheBytes, getter, nil)
}

// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
	if getter == nil {
		panic("nil Getter")
	}
	mu.Lock()
	defer mu.Unlock()
	// 初始化
	initPeerServerOnce.Do(callInitPeerServer)
	// group的名称不允许重复
	if _, dup := groups[name]; dup {
		panic("duplicate registration of group " + name)
	}
	g := &Group{
		name:       name,
		getter:     getter,
		peers:      peers,
		cacheBytes: cacheBytes,
		// singleflight调用组
		loadGroup:  &singleflight.Group{},
	}
	if fn := newGroupHook; fn != nil {
		fn(g)
	}
	groups[name] = g
	return g
}

同时提供了两个注册钩子的方法,用于开启分布式缓存服务,

func RegisterNewGroupHook(fn func(*Group)) {
	if newGroupHook != nil {
		panic("RegisterNewGroupHook called more than once")
	}
	newGroupHook = fn
}

// RegisterServerStart registers a hook that is run when the first
// group is created.
func RegisterServerStart(fn func()) {
	if initPeerServer != nil {
		panic("RegisterServerStart called more than once")
	}
	initPeerServer = fn
}

对于getter参数,可以从test文件里看到官方的传值样例:

在这里插入图片描述
groupcache.Group 本身不直接提供显式的 set 方法。这是因为 groupcache 的设计理念是只通过缓存加载函数(Getter)进行数据填充,而不是手动设置缓存值。groupcache 的工作机制类似于一个读缓存,当缓存中没有数据时,它会触发缓存加载函数来获取数据,并将结果存储在缓存中,之后再从缓存中读取。

它的工作流程如下:

  • 数据读取:通过 Get 方法读取数据。
  • 数据加载:如果缓存中不存在所需的数据,则会触发注册的缓存加载函数,从后端数据源获取数据,然后自动填充到缓存中。

核心逻辑在于Get方法:

// 在不初始化picker的情况下会返回NoPeers
func getPeers(groupName string) PeerPicker {
	if portPicker == nil {
		return NoPeers{}
	}
	pk := portPicker(groupName)
	if pk == nil {
		pk = NoPeers{}
	}
	return pk
}

// NewGroup给传入的peers为nil
// 所以会100%调用一次getPeers
func (g *Group) initPeers() {
	if g.peers == nil {
		// g.peers为nil
		g.peers = getPeers(g.name)
	}
}

func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
    // once控制调用一次初始化节点
	g.peersOnce.Do(g.initPeers)
	
	// 调用1次get就给Stats的get计数器+1
	g.Stats.Gets.Add(1)
	if dest == nil {
		return errors.New("groupcache: nil dest Sink")
	}
	// 查询key是否存在
	value, cacheHit := g.lookupCache(key)

	if cacheHit {
	    // 如果缓存命中后命中次数+1
		g.Stats.CacheHits.Add(1)
		return setSinkView(dest, value)
	}

	// Optimization to avoid double unmarshalling or copying: keep
	// track of whether the dest was already populated. One caller
	// (if local) will set this; the losers will not. The common
	// case will likely be one caller.
	destPopulated := false
	value, destPopulated, err := g.load(ctx, key, dest)
	if err != nil {
		return err
	}
	if destPopulated {
		return nil
	}
	return setSinkView(dest, value)
}

func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
    // 如果未设置key直接返回
	if g.cacheBytes <= 0 {
		return
	}
	// 如果在mainCache找也返回
	value, ok = g.mainCache.get(key)
	if ok {
		return
	}
	// 最后取hotCache中去找
	value, ok = g.hotCache.get(key)
	return
}

当从缓存中取到值(BytesView)后,调用setSinkView优化数据的传递路径,减少内存拷贝操作,提升性能。

// sink 通常用来表示数据的最终接收者或目标。
func setSinkView(s Sink, v ByteView) error {
	// A viewSetter is a Sink that can also receive its value from
	// a ByteView. This is a fast path to minimize copies when the
	// item was already cached locally in memory (where it's
	// cached as a ByteView)
	type viewSetter interface {
		setView(v ByteView) error
	}
	if vs, ok := s.(viewSetter); ok {
		return vs.setView(v)
	}
	if v.b != nil {
		return s.SetBytes(v.b)
	}
	return s.SetString(v.s)
}

如果没有命中缓存则把key加入缓存,

	...
	destPopulated := false
	value, destPopulated, err := g.load(ctx, key, dest)
	if err != nil {
		return err
	}
	// 不再二次填充
	if destPopulated {
		return nil
	}
	return setSinkView(dest, value)
}

func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
    // load计数器+1
	g.Stats.Loads.Add(1)
	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
		// Check the cache again because singleflight can only dedup calls
		// that overlap concurrently.  It's possible for 2 concurrent
		// requests to miss the cache, resulting in 2 load() calls.  An
		// unfortunate goroutine scheduling would result in this callback
		// being run twice, serially.  If we don't check the cache again,
		// cache.nbytes would be incremented below even though there will
		// be only one entry for this key.
		//
		// Consider the following serialized event ordering for two
		// goroutines in which this callback gets called twice for the
		// same key:
		// 1: Get("key")
		// 2: Get("key")
		// 1: lookupCache("key")
		// 2: lookupCache("key")
		// 1: load("key")
		// 2: load("key")
		// 1: loadGroup.Do("key", fn)
		// 1: fn()
		// 2: loadGroup.Do("key", fn)
		// 2: fn()
		if value, cacheHit := g.lookupCache(key); cacheHit {
			g.Stats.CacheHits.Add(1)
			return value, nil
		}
		
		g.Stats.LoadsDeduped.Add(1)
		var value ByteView
		var err error
		// 从其他节点尝试获取key
		if peer, ok := g.peers.PickPeer(key); ok {
			value, err = g.getFromPeer(ctx, peer, key)
			if err == nil {
				g.Stats.PeerLoads.Add(1)
				return value, nil
			}
			g.Stats.PeerErrors.Add(1)
			// TODO(bradfitz): log the peer's error? keep
			// log of the past few for /groupcachez?  It's
			// probably boring (normal task movement), so not
			// worth logging I imagine.
		}
		// 从本地获取key-val
		value, err = g.getLocally(ctx, key, dest)
		if err != nil {
			g.Stats.LocalLoadErrs.Add(1)
			return nil, err
		}
		g.Stats.LocalLoads.Add(1)
		destPopulated = true // only one caller of load gets this return value
		// 触发一次垃圾回收
		g.populateCache(key, value, &g.mainCache)
		return value, nil
	})
	if err == nil {
		value = viewi.(ByteView)
	}
	return
}

func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
    // getter此刻被调用
	err := g.getter.Get(ctx, key, dest)
	if err != nil {
		return ByteView{}, err
	}
	return dest.view()
}

func (g *Group) populateCache(key string, value ByteView, cache *cache) {
	if g.cacheBytes <= 0 {
		return
	}
	// 先把key-val加入缓存再淘汰旧的数据
	cache.add(key, value)

	// Evict items from cache(s) if necessary.
	for {
		mainBytes := g.mainCache.bytes()
		hotBytes := g.hotCache.bytes()
		if mainBytes+hotBytes <= g.cacheBytes {
			return
		}

		// TODO(bradfitz): this is good-enough-for-now logic.
		// It should be something based on measurements and/or
		// respecting the costs of different resources.
		victim := &g.mainCache
		if hotBytes > mainBytes/8 {
			victim = &g.hotCache
		}
		victim.removeOldest()
	}
}

淘汰策略优先淘汰主存,如果 hotCache 中的使用量(hotBytes)超过了 mainCache 使用量的 1/8(mainBytes/8),则选择 hotCache 作为淘汰对象。这表明 hotCache 中的数据量相对较大,可能需要优先进行淘汰。

hotCache 存储了热点数据,通常这些数据被频繁访问。保持热点数据的有效性可以显著提高系统性能。如果热点缓存的数据量很大,选择 hotCache 作为淘汰对象的逻辑实际上是为了避免 hotCache 过度膨胀。如果 hotCache 中的数据量过大,它可能占用了过多的缓存空间,影响到主缓存(mainCache)的性能和可用空间。因此,适当的淘汰可以帮助控制 hotCache 的大小,使其保持在合理的范围内,从而避免对整体系统性能产生负面影响。

在 groupcache 的 load 方法中,destPopulated 的作用是指示是否有数据被成功填充到 dest 中。具体来说,它用于标记 dest 是否在缓存加载过程中被填充了数据。这是一个重要的机制,以确保只有一个调用者会负责填充缓存数据,避免重复的填充操作。

内存计算的思考

以上LRU cache实现,有各种各样的淘汰策略,只有groupcache是基于当前key占用的字节大小进行内存淘汰,其他cache甚至用用户传入size作为阈值…

groupcache的内存占用计算:

c.nbytes += int64(len(key)) + int64(value.Len())

string和[]bytes实际占用的内存应该不是简单的len(),string 类型在 Go 中是一个只读的字符串类型,底层由一个指向字节数组的指针和一个表示长度的整数构成。[]byte 在 Go 中是一个切片类型,底层结构大致如下:

type stringHeader struct {
    Data uintptr
    Len  int
}

type sliceHeader struct {
    Data uintptr
    Len  int
    Cap  int
}

string 类型的总内存占用则包括结构体的开销以及字符串数据的字节数,切片的实际内存占用包括切片结构体的大小和实际存储的数据的字节数。

从内存管理的角度来看,groupcache 的内存计算确实是比较粗略的。groupcache 主要依赖于键值对的字节长度(即 len(key) + value.Len())来估算缓存占用的大小。然而,这种计算并不包括其他与内存管理相关的开销,例如 Go 运行时分配的元数据、切片头部或字符串指针的大小,以及潜在的内存对齐和额外的内存分配。

特别是:

  • string 和 []byte 的头部开销:虽然 groupcache 计算了键值的字节长度,但未包含 string 和 []byte 结构体的头部信息。
  • 堆上的内存分配:Go 的内存管理包括堆栈、堆和垃圾回收等机制,实际的内存使用可能会高于简单的字节长度计算。
  • 内存碎片化:长时间运行的程序可能会由于频繁的内存分配和释放导致内存碎片化,这些额外的内存开销不会被直接反映在缓存大小的计算中。

Redis 的内存计算相对比 groupcache 更加复杂和精确,因为 Redis 是专门为高效内存管理设计的内存数据库。它会尝试尽可能精确地跟踪每个键值对的内存占用,以确保在高负载下能够有效使用内存。

  1. 精确计算数据的内存占用

    • Redis 会根据每种数据类型(如字符串、列表、集合、哈希、ZSet 等)和底层实现方式计算其内存占用。例如,string 类型的数据会根据其长度选择不同的编码方式(如 int, raw, embstr),不同编码方式的内存开销也不同。
    • 对于复杂的数据结构(如列表、哈希等),Redis 会使用优化的内存管理策略(如 ziplist, hashtable 等),根据元素的数量和大小动态调整底层的数据存储结构,以节省内存。
  2. 内存开销
    Redis 的内存计算包括以下几个部分:

    • 键和值的大小:即存储键和值的实际字节数。
    • 数据结构的开销:如 Redis 使用的内部数据结构(哈希表、跳跃表等)的额外内存开销。
    • 对象元数据的开销:如对象头部信息、指针等附加开销。
    • 碎片化:Redis 使用的内存分配器(如 jemalloc)可能会引入一些碎片化,这也是 Redis 内存占用的一部分。
  3. 实际内存计算示例

    • Redis 提供了 MEMORY USAGE 命令,用于精确查看特定键的内存占用。它会包括键的值、元数据和内存分配的碎片等。例如:
      > MEMORY USAGE mykey
      (integer) 56
      
    • 该命令返回的是键所占用的内存大小,单位为字节,包含了上述所有内存开销。

Redis 通过一些内存优化策略来尽量减少内存占用:

  • 内存分配器:Redis 默认使用 jemalloc 内存分配器,它能更好地管理碎片化,减少内存浪费。
  • 数据压缩:Redis 会根据键值的大小和类型,使用压缩或紧凑的存储方式来优化内存使用。例如,使用 ziplistintset 等小型数据结构来存储小规模的数据。
  • 内存策略:Redis 提供了内存淘汰策略(如 LRU、LFU 等),当内存超过配置的上限时,会根据策略进行精确的缓存淘汰。

Redis 的内存计算是非常精确的,考虑了多方面的因素,如数据的编码方式、数据结构的开销、内存碎片和分配器的影响。此外,Redis 通过多种内存优化策略来最大化内存使用效率。因此,相较于 groupcache 的粗略估算,Redis 能更准确地计算和管理内存。

reflect+unsafe计算内存消耗

unsafe.Sizeof 可以用来精确计算 Go 中某个变量或类型的大小。它返回的值是类型在内存中的占用大小(以字节为单位)。

package main

import (
    "fmt"
    "unsafe"
)

func main() {
    var i int
    fmt.Println("Size of int:", unsafe.Sizeof(i)) // 输出int类型的字节大小

    var s string
    fmt.Println("Size of string:", unsafe.Sizeof(s)) // 输出string类型的字节大小
}

但unsafe.Sizeof 只会计算类型的直接内存大小,不包括底层引用类型(如切片、映射、字符串等)的间接内存大小。

reflect 包可以用来检查 Go 变量的类型信息,并通过遍历复杂数据结构(如结构体、切片、映射等)来计算其内存使用。

刚好加上unsafe包就可以得到以下函数:

package main

import (
    "fmt"
    "reflect"
    "unsafe"
)

func sizeOf(v interface{}) uintptr {
    val := reflect.ValueOf(v)
    return calculateSize(val)
}

func calculateSize(v reflect.Value) uintptr {
    switch v.Kind() {
    case reflect.Ptr, reflect.Interface:
        if v.IsNil() {
            return 0
        }
        return unsafe.Sizeof(v.Pointer()) + calculateSize(v.Elem())
    case reflect.Slice, reflect.Array:
        size := uintptr(0)
        for i := 0; i < v.Len(); i++ {
            size += calculateSize(v.Index(i))
        }
        return size
    case reflect.Struct:
        size := uintptr(0)
        for i := 0; i < v.NumField(); i++ {
            size += calculateSize(v.Field(i))
        }
        return size
    default:
        return unsafe.Sizeof(v.Interface())
    }
}

func main() {
    type Person struct {
        Name string
        Age  int
    }

    p := Person{Name: "Alice", Age: 30}
    fmt.Printf("Memory size: %d bytes\n", sizeOf(p))
}

对于一个字符串s:

  • unsafe.Sizeof(s):它返回的是字符串头部结构体的大小。这个结构体仅包含指向实际数据的指针和表示长度的字段,在 64 位系统中通常是 16 字节(8 字节的指针 + 8 字节的长度)。

  • len(s):它返回字符串内容的长度(即字节数)。

在这里插入图片描述
因此size+len可以粗略的估计string和bytes占用的内存,不过要计算字符串在内存中的总占用,则还需要考虑其他因素,比如堆栈的内存管理和对齐问题。

runtime.MemStats计算内存

Go 的 runtime 包提供了对 Go 运行时信息的访问,包括内存分配、垃圾回收等统计信息。你可以通过 runtime.MemStats 获取程序的整体内存使用情况:

package main

import (
    "fmt"
    "runtime"
)

func printMemUsage() {
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    fmt.Printf("Alloc = %v MiB", m.Alloc / 1024 / 1024)
    fmt.Printf("\tTotalAlloc = %v MiB", m.TotalAlloc / 1024 / 1024)
    fmt.Printf("\tSys = %v MiB", m.Sys / 1024 / 1024)
    fmt.Printf("\tNumGC = %v\n", m.NumGC)
}

func main() {
    printMemUsage()

    // 模拟一些内存分配
    var mem []byte
    for i := 0; i < 10; i++ {
        mem = append(mem, make([]byte, 10*1024*1024)...)
        printMemUsage()
    }
}

在这里插入图片描述
runtime.MemStats 是 Go 标准库中用于获取内存使用统计信息的结构体,包含了 Go 运行时内存分配、垃圾回收等方面的详细信息。这些信息对理解应用程序的内存占用、调优和诊断性能问题非常有帮助。

下面是 runtime.MemStats 的主要字段及其含义:

基本分配统计

  1. Alloc
    当前应用程序占用的内存量(以字节为单位)。它表示所有正在使用的堆内存的总和。

  2. TotalAlloc
    从应用程序开始运行到当前为止,已经分配的堆内存总量。它包括了已经被垃圾回收器释放的内存。因此,它是一个累积值,随着程序的运行会不断增加。

  3. Sys
    从操作系统获取的所有内存的总量(以字节为单位),包括堆、栈、GC 元数据等。这是程序从操作系统实际申请到的内存量。

  4. Lookups
    用于内存管理的指针查找次数。在典型的程序中,这个值通常不会特别大。

  5. Mallocs
    申请内存的次数(内存分配操作的数量)。

  6. Frees
    释放内存的次数。Mallocs - Frees 代表了当前分配的对象数量。

堆统计

  1. HeapAlloc
    当前堆上分配的内存量(以字节为单位)。与 Alloc 类似,但仅限于堆内存。

  2. HeapSys
    为堆分配的系统内存总量,即从操作系统实际申请到的堆内存。

  3. HeapIdle
    未使用的堆内存总量(以字节为单位)。这些内存已经被申请但目前未被使用,可以归还给操作系统或者在未来分配时再次使用。

  4. HeapInuse
    当前正在使用的堆内存量(以字节为单位)。这些内存正在被 Go 程序使用,无法立即被归还给操作系统。

  5. HeapReleased
    返回给操作系统的堆内存总量。内存被释放后,操作系统可能会将这部分内存重新分配给其他进程。

  6. HeapObjects
    当前堆上分配的对象数量。它等于 Mallocs - Frees

栈统计

  1. StackInuse
    Go 程序使用的栈内存量(以字节为单位)。每个 goroutine 都有自己的栈,栈内存量是这个字段的度量。

  2. StackSys
    从操作系统申请的栈内存总量(以字节为单位)。

垃圾回收器统计

  1. NextGC
    下一次垃圾回收的内存阈值。当堆内存分配达到这个值时,垃圾回收器将再次运行。

  2. LastGC
    上一次垃圾回收的时间(以纳秒为单位,自 1970 年以来的时间戳)。

  3. PauseTotalNs
    自程序启动以来,由垃圾回收引起的总暂停时间(以纳秒为单位)。这可以帮助分析垃圾回收对应用程序性能的影响。

  4. PauseNs [256]
    最近 256 次垃圾回收暂停时间的数组(以纳秒为单位)。

  5. NumGC
    自程序启动以来已经完成的垃圾回收次数。

  6. GCCPUFraction
    垃圾回收过程中 CPU 时间占用的比例。它是一个 0 到 1 之间的小数,代表垃圾回收器在应用程序执行中占用了多少 CPU 时间。

其他统计

  1. BySize
    包含一个按分配大小的统计表。用于提供有关不同大小的内存块分配和垃圾回收的详细信息。

    • BySize[].Size
      表示分配的内存块大小。

    • BySize[].Mallocs
      表示该大小的内存块的分配次数。

    • BySize[].Frees
      表示该大小的内存块的释放次数。

比较关注的字段有三个,Alloc,HeapAlloc,Sys,详细对比这三者:

  1. Alloc

    • 含义Alloc 表示当前 Go 程序已经分配并正在使用的内存总量。它是 Go 堆上活动对象所占的内存量,包括尚未被垃圾回收器回收的内存。
    • 范围:这个值包含了程序中分配但尚未被垃圾回收的所有内存。
    • 用途Alloc 是一个非常直观的指标,可以帮助用户了解当前 Go 程序正在消耗多少内存(活动对象的大小)。
  2. HeapAlloc

    • 含义HeapAlloc 专门表示堆上已经分配并正在使用的内存量。堆是用于存储 Go 程序中动态分配的对象(如指针、切片、字符串、结构体等)的区域。
    • 范围HeapAlloc 是 Go 运行时在堆上分配的内存,并且这部分内存目前还没有被垃圾回收器回收。因此,它基本上等同于 Alloc,但专门指堆上分配的内存。
    • 用途:如果对内存分配的关注点是堆(尤其是对于长生命周期对象的内存分配),HeapAlloc 是一个更有针对性的指标。
  3. Sys

    • 含义Sys 表示 Go 程序从操作系统申请的内存总量。这个值是一个更大的数值,包含了 Go 运行时为堆、栈、垃圾回收器元数据、代码空间、内存管理等目的从操作系统申请的所有内存。
    • 范围Sys 包括堆内存、栈内存、代码段、全局变量、内存管理结构等。它不仅包括程序的实际内存使用,还包括操作系统为支持 Go 运行时执行的所有内存分配。因此,这个值比 AllocHeapAlloc 要大得多。
    • 用途Sys 适用于评估程序对操作系统的总内存占用,它可以帮助理解操作系统为了运行 Go 程序分配了多少内存资源。

小结:

  • Alloc:表示程序当前实际使用的内存,包括所有分配但未释放的对象内存。用于评估程序活跃的内存占用情况。
  • HeapAlloc:是 Alloc 的一个子集,专门针对堆上分配的内存。它排除了非堆内存,比如栈内存和其他运行时的元数据。
  • Sys:包含了从操作系统申请的所有内存,它是一个更宽泛的指标,涵盖了堆、栈、元数据、管理开销等,是评估整个 Go 运行时对系统内存资源的消耗的关键指标。

综上可以用Alloc去评估内存的使用情况,但groupcache也许是考虑到其开销而没有用这个方法。对于go cache如何更细粒度的控制内存逐出,欢迎大佬们补充。


http://www.kler.cn/a/300832.html

相关文章:

  • Quantum supremacy using a programmable superconducting processor 全文翻译,配公式和图
  • 【Linux】Socket编程-TCP构建自己的C++服务器
  • eBay账号安全攻略:巧妙应对风险
  • Kafka常用命令
  • 80_Redis内存策略
  • linux系统监视(centos 7)
  • 哈希表如何避免冲突
  • Find My外卖箱|苹果Find My技术与外卖箱结合,智能防丢,全球定位
  • 二十三种模式之原型模式(类比制作陶器更好理解一些)
  • RK3588高性能处理器助力测量机器人精准作业
  • 【数据结构】堆——堆排序与海量TopK问题
  • Sqlserver常用sql
  • 【有啥问啥】计算机视觉领域中的光流(Optical Flow)是什么?
  • Android相关线程基础
  • 苹果研究人员提出了一种新颖的AI算法来优化字节级表示以自动语音识别(ASR),并将其与UTF-8表示进行比较
  • 各种无人机飞行服务技术详解
  • 基于Netty实现TCP客户端
  • Python集合应用:10+个集合操作的实用案例
  • 海康SDK对接 超脑设备-下发人员信息和人脸
  • 前端面试常见手写题
  • C/C++ 网络编程之关于多核利用问题
  • 【openGauss】WDR快照无法生成或执行生成不报错,但是snapshot.snapshot为空的问题
  • Linux s3c2440 开发板上的操作系统实现 ubuntu
  • 《中国制药设备行业市场现状分析与发展前景预测研究报告》
  • spring中添加@Test注解测试
  • docker的相关网络问题