k8s-Informer之Indexer的解析(4)
1、Indexer 介绍
k8s-Informer之Indexer的解析(4)
Indexer 是 Client-go 中实现的一个本地存储,它可以建立索引并存储 Resource 的对象。Indexer 中的数据始终要是与 ETCD 中数据一致的,当 client-go 需要数据时,可直接通过该本地缓存获取资源对象,不需要每次都从 APIServer中获取,这样就减轻了请求过多造成对 APIServer 、etcd的压力。
从 DeltaFIFO 中 Pop 出来的资源对象,交给了 HandlerDeltas 进行处理,在 HandleDeltas 中将资源对象同步到了 Indexer 中。
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}
// k8s.io/client-go/tools/cache/controller.go
func processDeltas( handler ResourceEventHandler, clientState Store, transformer TransformFunc, deltas Deltas) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
2、Indexer 结构体定义
Indexer 接口继承了一个Store接口(实现本地缓存)
type Indexer interface {
Store
// indexName索引类,obj 是对象
// 通过计算 obj 在 indexName 索引类中的索引键,通过索引键获取所有的对象
Index(indexName string, obj interface{}) ([]interface{}, error)
// indexKey 是 indexName 索引类中的⼀个索引键,函数返回 indexKey 指定的所有对象键
IndexKeys(indexName, indexedValue string) ([]string, error)
// 获取 indexName 索引类中的所有索引键
ListIndexFuncValues(indexName string) []string
// 返回所有对象
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexers return the indexers
GetIndexers() Indexers
// 就是增加更多的索引分类
AddIndexers(newIndexers Indexers) error
}
// store 接口,staging/src/k8s.io/client-go/tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
2.1、Store 结构体:
cache struct包含一个ThreadSafeStore接口的实现和计算object key的函数KeyFunc。cache struct会根据keyFunc生成某个obj对象对应的一个唯一key, 然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象。
type cache struct {
//
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
2.2、ThreadSafeStore
ThreadSafeStore 接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法。
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
Resync() error
}
2.3、threadSafeMap
threadSafeMap struct 是ThreadSafeStore接口的一个实现
type threadSafeMap struct {
lock sync.RWMutex
// items 就是存放资源对象,key根据资源对象来算出,value为资源对象本身
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
3、Indexer 索引功能
与索引功能相关定义在 threadSafeMap 结构体中
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
type IndexFunc func(obj interface{}) ([]string, error)
type Index map[string]sets.String
// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]Index
3.1、 MetaNamespaceIndexFunc 函数
MetaNamespaceIndexFunc 函数在 Kubernetes 中使用的比较多的索引函数,是一个默认索引函数,它基于对象的命名空间进行索引。
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
3.2 、索引的实现 ByIndex() 函数
该方法传入索引器名称 indexName 和索引键名称indexedValue,方法寻找该索引器下,索引键对应的对象键列表,然后根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。
大致逻辑:
(1)首先根据索引器名称查找指定的索引器函数;
(2)然后根据索引器名称查找相应的缓存器函数;
(3)再根据索引 Key (indexedValue)从缓存中进行数据查询, 并返回查询结果;
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 1、首先根据索引器名称查找指定的索引器函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 2、然后根据索引器名称查找相应的缓存器函数
index := c.indices[indexName]
// 3、根据索引 Key (indexedValue)从缓存中进行数据查询, 并返回查询结果
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
使用示例:
pods, err := index.ByIndex("namespace", "default")
if err != nil {
panic(err)
}
for _, pod := range pods {
fmt.Println(pod.(*v1.Pod).Name)
}
fmt.Println("------")
pods, err := index.ByIndex("nodename", "node1")
if err != nil {
panic(err)
}
for _, pod := range pods {
fmt.Println(pod.(*v1.Pod).Name)
}
输出:
pod-1
pod-2
------
pod-1
总结:
Indexer 具有维护本地缓存的功能,还有一个更重要的功能就是索引功能了,这个索引的目的就是实现快速查找。如要查某个 Node 下的所有 Pod,或查找某个命名空间下的所有pod等等。使用索引就能实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性。
Informer