当前位置: 首页 > article >正文

k8s-Informer之Indexer的解析(4)

1、Indexer 介绍

k8s-Informer之Indexer的解析(4)

Indexer 是 Client-go 中实现的一个本地存储,它可以建立索引并存储 Resource 的对象。Indexer 中的数据始终要是与 ETCD 中数据一致的,当 client-go 需要数据时,可直接通过该本地缓存获取资源对象,不需要每次都从 APIServer中获取,这样就减轻了请求过多造成对 APIServer 、etcd的压力。

从 DeltaFIFO 中 Pop 出来的资源对象,交给了 HandlerDeltas 进行处理,在 HandleDeltas 中将资源对象同步到了 Indexer 中。


// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {  
   s.blockDeltas.Lock()  
   defer s.blockDeltas.Unlock()  
  
   if deltas, ok := obj.(Deltas); ok {  
      return processDeltas(s, s.indexer, s.transform, deltas)  
   }  
   return errors.New("object given as Process argument is not Deltas")  
}

  // k8s.io/client-go/tools/cache/controller.go
func processDeltas(  handler ResourceEventHandler,  clientState Store,  transformer TransformFunc,  deltas Deltas) error {  
   // from oldest to newest  
   for _, d := range deltas {  
      obj := d.Object  
      if transformer != nil {  
         var err error  
         obj, err = transformer(obj)  
         if err != nil {  
            return err  
         }  
      }  
  
      switch d.Type {  
      case Sync, Replaced, Added, Updated:  
         if old, exists, err := clientState.Get(obj); err == nil && exists {  
            if err := clientState.Update(obj); err != nil {  
               return err  
            }  
            handler.OnUpdate(old, obj)  
         } else {  
            if err := clientState.Add(obj); err != nil {  
               return err  
            }  
            handler.OnAdd(obj)  
         }  
      case Deleted:  
         if err := clientState.Delete(obj); err != nil {  
            return err  
         }  
         handler.OnDelete(obj)  
      }  
   }  
   return nil  
}

2、Indexer 结构体定义

Indexer 接口继承了一个Store接口(实现本地缓存)

type Indexer interface {  
   Store  
   // indexName索引类,obj 是对象 
   // 通过计算 obj 在 indexName 索引类中的索引键,通过索引键获取所有的对象
   Index(indexName string, obj interface{}) ([]interface{}, error) 
    
   // indexKey 是 indexName 索引类中的⼀个索引键,函数返回 indexKey 指定的所有对象键
   IndexKeys(indexName, indexedValue string) ([]string, error) 
    
   // 获取 indexName 索引类中的所有索引键  
   ListIndexFuncValues(indexName string) []string  
    
   // 返回所有对象 
   ByIndex(indexName, indexedValue string) ([]interface{}, error)  
   
   // GetIndexers return the indexers   
   GetIndexers() Indexers  
  
   // 就是增加更多的索引分类
   AddIndexers(newIndexers Indexers) error  
}

// store 接口,staging/src/k8s.io/client-go/tools/cache/store.go
type Store interface {
   Add(obj interface{}) error
   Update(obj interface{}) error
   Delete(obj interface{}) error
   List() []interface{}
   ListKeys() []string
   Get(obj interface{}) (item interface{}, exists bool, err error)
   GetByKey(key string) (item interface{}, exists bool, err error)
   Replace([]interface{}, string) error
   Resync() error
}

2.1、Store 结构体:

cache struct包含一个ThreadSafeStore接口的实现和计算object key的函数KeyFunc。cache struct会根据keyFunc生成某个obj对象对应的一个唯一key, 然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象。

type cache struct {  
   // 
   cacheStorage ThreadSafeStore  
   keyFunc KeyFunc  
}

2.2、ThreadSafeStore

ThreadSafeStore 接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法。


type ThreadSafeStore interface {  
   Add(key string, obj interface{})  
   Update(key string, obj interface{})  
   Delete(key string)  
   Get(key string) (item interface{}, exists bool)  
   List() []interface{}  
   ListKeys() []string  
   Replace(map[string]interface{}, string)  
   Index(indexName string, obj interface{}) ([]interface{}, error)  
   IndexKeys(indexName, indexedValue string) ([]string, error)  
   ListIndexFuncValues(name string) []string  
   ByIndex(indexName, indexedValue string) ([]interface{}, error)  
   GetIndexers() Indexers  
   Resync() error  
}

2.3、threadSafeMap

threadSafeMap struct 是ThreadSafeStore接口的一个实现


type threadSafeMap struct {  
   lock  sync.RWMutex  

   // items 就是存放资源对象,key根据资源对象来算出,value为资源对象本身
   items map[string]interface{}  
  
   // indexers maps a name to an IndexFunc  
   indexers Indexers  
   // indices maps a name to an Index  
   indices Indices  
}

3、Indexer 索引功能

与索引功能相关定义在 threadSafeMap 结构体中


// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {  
   lock  sync.RWMutex  
   items map[string]interface{}  
  
   // indexers maps a name to an IndexFunc  
   indexers Indexers  
   // indices maps a name to an Index  
   indices Indices  
}

type IndexFunc func(obj interface{}) ([]string, error)

type Index map[string]sets.String  
  
// Indexers maps a name to an IndexFunc  
type Indexers map[string]IndexFunc  
  
// Indices maps a name to an Index  
type Indices map[string]Index

3.1、 MetaNamespaceIndexFunc 函数

MetaNamespaceIndexFunc 函数在 Kubernetes 中使用的比较多的索引函数,是一个默认索引函数,它基于对象的命名空间进行索引。


func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {  
   meta, err := meta.Accessor(obj)  
   if err != nil {  
      return []string{""}, fmt.Errorf("object has no meta: %v", err)  
   }  
   return []string{meta.GetNamespace()}, nil  
}

3.2 、索引的实现 ByIndex() 函数

该方法传入索引器名称 indexName 和索引键名称indexedValue,方法寻找该索引器下,索引键对应的对象键列表,然后根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。

大致逻辑:

(1)首先根据索引器名称查找指定的索引器函数;

(2)然后根据索引器名称查找相应的缓存器函数;

(3)再根据索引 Key (indexedValue)从缓存中进行数据查询, 并返回查询结果;


func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {  
   c.lock.RLock()  
   defer c.lock.RUnlock()  

   // 1、首先根据索引器名称查找指定的索引器函数
   indexFunc := c.indexers[indexName]  
   if indexFunc == nil {  
      return nil, fmt.Errorf("Index with name %s does not exist", indexName)  
   }  
  
   // 2、然后根据索引器名称查找相应的缓存器函数
   index := c.indices[indexName]  
  
   // 3、根据索引 Key (indexedValue)从缓存中进行数据查询, 并返回查询结果
   set := index[indexedValue]  
   list := make([]interface{}, 0, set.Len())  
   for key := range set {  
      list = append(list, c.items[key])  
   }  
  
   return list, nil  
}

使用示例:


pods, err := index.ByIndex("namespace", "default")
if err != nil {
    panic(err)
}
for _, pod := range pods {
    fmt.Println(pod.(*v1.Pod).Name)
}

fmt.Println("------")

pods, err := index.ByIndex("nodename", "node1")
if err != nil {
    panic(err)
}
for _, pod := range pods {
    fmt.Println(pod.(*v1.Pod).Name)
}

输出:

pod-1
pod-2
------
pod-1

总结:

Indexer 具有维护本地缓存的功能,还有一个更重要的功能就是索引功能了,这个索引的目的就是实现快速查找。如要查某个 Node 下的所有 Pod,或查找某个命名空间下的所有pod等等。使用索引就能实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性。

Informer


http://www.kler.cn/a/427487.html

相关文章:

  • 循序渐进kubenetes Service(Cluster ip、Nodeport、Loadbalancer)
  • 简易图书管理系统
  • 16 Java(junit)测试+Assert(断言测试)、枚举类、注解、javac编译和javap反编译命令、常量
  • 夏普MX-4608N复印机维修模式进入方法及载体初始化方法
  • 基于STM32的Wi-Fi无人机项目
  • 手写电子签名并保存到当前项目下
  • 【Spark】 groupByKey与reduceByKey的区别
  • Conda-Pack打包:高效管理Python环境
  • 安全生产培训题库200道;免费题库;大风车题库
  • ArrayList 底层结构和源码分析/注意事项
  • 基于Java Springboot环境保护生活App且微信小程序
  • 代码随想录-算法训练营day42(动态规划05:最后一块石头的重量2,目标和,一和零)
  • AttributeError: module numpy has no attribute int .报错解决
  • API设计指南:详解HTTP状态码错误解析、HTTP方法及参数命名规则
  • Uniapp的vue、nvue、uvue后缀名区别
  • CSS 实现视差滚动:详解 background-attachment 与 transform:translate3D 及应用
  • matlab Delaunay三角剖分提取平面点云的边界
  • 【随笔笔记】将mysql数据迁移到群晖NAS
  • 阿拉丁论文助手:一键点亮学术之路
  • 仿真键盘输入遇到Edge环境不识别 回车符如何处理