当前位置: 首页 > article >正文

8.5 kubelet维护pod的内存管理器podManager源码解析

本节重点总结 : podManager的作用

  • podManager在kubelet中负责内存中pod及mirrorPod的维护
  • 任何在本节点上创建或删除操作都会同步更新podManager
  • 即可以认为podManager中存储了本节点上运行的pod的信息
  • 分析AddPod的源码

什么是mirror pod

  • mirrorpod主要与kubelet运行于standalone模式有关
  • 假如pod是通过file或http的形式获得的,这个pod被称为static pod,k8s会在集群中创建一个对应的mirror pod
  • 这时系统很难管理这部分pod;所以系统会在kubelet中创建一个static pod对应的mirrorPod,来表示static pod。

podManger的接口方法

  • 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\pod\pod_manager.go

核心方法说明

type Manager interface {
	// GetPods 获取属于这个kubelet的pods
	GetPods() []*v1.Pod
	// GetPodByFullName 根据pod的全名: namespace+name 返回pod
	GetPodByFullName(podFullName string) (*v1.Pod, bool)
	// GetPodByName  根据pod的name 返回pod
	GetPodByName(namespace, name string) (*v1.Pod, bool)
	// GetPodByUID   根据pod的UID 返回pod
	GetPodByUID(types.UID) (*v1.Pod, bool)
	// GetPodByMirrorPod 返回mirror pod
	GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
	// GetMirrorPodByPod returns the mirror pod for the given static pod and
	// whether it was known to the pod manager.
	GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
	// GetPodsAndMirrorPods returns the both regular and mirror pods.
	GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
	// SetPods replaces the internal pods with the new pods.
	// It is currently only used for testing.
	SetPods(pods []*v1.Pod)
	// AddPod 添加新的pod到manager中
	AddPod(pod *v1.Pod)
	// UpdatePod updates the given pod in the manager.
	UpdatePod(pod *v1.Pod)
	// DeletePod deletes the given pod from the manager.  For mirror pods,
	// this means deleting the mappings related to mirror pods.  For non-
	// mirror pods, this means deleting from indexes for all non-mirror pods.
	DeletePod(pod *v1.Pod)
	// GetOrphanedMirrorPodNames returns names of orphaned mirror pods
	GetOrphanedMirrorPodNames() []string
	// TranslatePodUID returns the actual UID of a pod. If the UID belongs to
	// a mirror pod, returns the UID of its static pod. Otherwise, returns the
	// original UID.
	//
	// All public-facing functions should perform this translation for UIDs
	// because user may provide a mirror pod UID, which is not recognized by
	// internal Kubelet functions.
	TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
	// GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
	// UIDs and mirror pod UIDs to static pod UIDs.
	GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
	// IsMirrorPodOf returns true if mirrorPod is a correct representation of
	// pod; false otherwise.
	IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool

	MirrorClient
}

podManager的底层数据接口

  • 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\pod\pod_manager.go
// AddPod, UpdatePod, or DeletePod.
type basicManager struct {
	// Protects all internal maps.
	lock sync.RWMutex

	// Regular pods indexed by UID.
	podByUID map[kubetypes.ResolvedPodUID]*v1.Pod
	// Mirror pods indexed by UID.
	mirrorPodByUID map[kubetypes.MirrorPodUID]*v1.Pod

	// Pods indexed by full name for easy access.
	podByFullName       map[string]*v1.Pod
	mirrorPodByFullName map[string]*v1.Pod

	// Mirror pod UID to pod UID map.
	translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID

	// basicManager is keeping secretManager and configMapManager up-to-date.
	secretManager    secret.Manager
	configMapManager configmap.Manager

	// A mirror pod client to create/delete mirror pods.
	MirrorClient
}

字段说明

  • podByUID: 记录uid和pod的map关系;
  • mirrorPodByUID: 记录mirrorPod的uid和mirrorPod的map关系;
  • podByFullName: 记录fullName和pod的map关系;
  • mirrorPodByFullName: 记录fullName和mirrorPod的map关系;
  • translationByUID: 记录mirrorPod的uid和pod的uid的map关系;
  • MirrorClient: 可以管理mirrorPod。

AddPod逻辑

  • 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\pod\pod_manager.go
  • 可以看到AddPod底层调用的是 UpdatePod
func (pm *basicManager) AddPod(pod *v1.Pod) {
	pm.UpdatePod(pod)
}

func (pm *basicManager) UpdatePod(pod *v1.Pod) {
	pm.lock.Lock()
	defer pm.lock.Unlock()
	pm.updatePodsInternal(pod)
}

updatePodsInternal解析

  • 遍历传入的pod,判断pod状态,如果是Terminated就调用secretManager注销相关的secret
  • Terminated代表pod状态为PodFailed或者是PodSucceeded

	for _, pod := range pods {
		if pm.secretManager != nil {
			if isPodInTerminatedState(pod) {
				// Pods that are in terminated state and no longer running can be
				// ignored as they no longer require access to secrets.
				// It is especially important in watch-based manager, to avoid
				// unnecessary watches for terminated pods waiting for GC.
				pm.secretManager.UnregisterPod(pod)
			} else {
				// TODO: Consider detecting only status update and in such case do
				// not register pod, as it doesn't really matter.
				pm.secretManager.RegisterPod(pod)
			}
		}
  • 底层调用cacheBasedManager的UnregisterPod方法,位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\util\manager\cache_based_manager.go
  • 对应就是从cache的store中将pod的secret删掉
func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
	var prev *v1.Pod
	key := objectKey{namespace: pod.Namespace, name: pod.Name}
	c.lock.Lock()
	defer c.lock.Unlock()
	prev = c.registeredPods[key]
	delete(c.registeredPods, key)
	if prev != nil {
		for name := range c.getReferencedObjects(prev) {
			c.objectStore.DeleteReference(prev.Namespace, name)
		}
	}
}

podManager的secretManager和configMapManager解析

  • 初始化的入口NewMainKubelet中, 在 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\kubelet.go
  • 默认的ConfigMapAndSecretChangeDetectionStrategy策略为 Watch,所以使用NewWatchingSecretManager初始化

	var secretManager secret.Manager
	var configMapManager configmap.Manager
	switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
	case kubeletconfiginternal.WatchChangeDetectionStrategy:
		secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
		configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
NewWatchingSecretManager解析
  • 分别构造listSecret 、watchSecret、newSecret方法
func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
	listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
		return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
	}
	watchSecret := func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
		return kubeClient.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
	}
	newSecret := func() runtime.Object {
		return &v1.Secret{}
	}
	isImmutable := func(object runtime.Object) bool {
		if secret, ok := object.(*v1.Secret); ok {
			return secret.Immutable != nil && *secret.Immutable
		}
		return false
	}
	gr := corev1.Resource("secret")
	return &secretManager{
		manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames),
	}
}

  • 底层创建的是objectCache
type objectCache struct {
	listObject    listObjectFunc
	watchObject   watchObjectFunc
	newObject     newObjectFunc
	isImmutable   isImmutableFunc
	groupResource schema.GroupResource
	clock         clock.Clock
	maxIdleTime   time.Duration

	lock  sync.RWMutex
	items map[objectKey]*objectCacheItem
}

  • 传入的getSecretNames代表拿到pod对象之后去secret cache中查询,将pod对应的Secret全部找到
func getSecretNames(pod *v1.Pod) sets.String {
	result := sets.NewString()
	podutil.VisitPodSecretNames(pod, func(name string) bool {
		result.Insert(name)
		return true
	})
	return result
}

回到secretManager 注册和注销pod secret的地方

通用注册方法 RegisterPod

  • 调用 getReferencedObjects也就是getSecretNames获取pod的所有secret名字得到names,遍历names添加到objectStore中
	names := c.getReferencedObjects(pod)
	c.lock.Lock()
	defer c.lock.Unlock()
	for name := range names {
		c.objectStore.AddReference(pod.Namespace, name)
	}
  • 同时以namespace+name为key,更新一下registeredPods的map
	var prev *v1.Pod
	key := objectKey{namespace: pod.Namespace, name: pod.Name}
	prev = c.registeredPods[key]
	c.registeredPods[key] = pod
  • 用如果之前存在这个key的对象prev,就在objectStore删除,等于增加新的删除旧的,避免update

通用的注销方法

  • 计算key然后删除registeredPods和objectStore的内容
func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
	var prev *v1.Pod
	key := objectKey{namespace: pod.Namespace, name: pod.Name}
	c.lock.Lock()
	defer c.lock.Unlock()
	prev = c.registeredPods[key]
	delete(c.registeredPods, key)
	if prev != nil {
		for name := range c.getReferencedObjects(prev) {
			c.objectStore.DeleteReference(prev.Namespace, name)
		}
	}
}

回到updatePodsInternal入口这里

  • 如果pod 是Terminated状态就 用configMapManager和secretManager注销configMap和secret
  • 如果pod 非Terminated状态就 用configMapManager和secretManager注册configMap和secret
  • configMapManager和secretManager底层都是cacheBasedManager享有通用的注册/注销方法

判断 pod是否是mirror pod

  • 如果是的话在pm的mirror缓存字段中更新值
		if kubetypes.IsMirrorPod(pod) {
			mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
			pm.mirrorPodByUID[mirrorPodUID] = pod
			pm.mirrorPodByFullName[podFullName] = pod
			if p, ok := pm.podByFullName[podFullName]; ok {
				pm.translationByUID[mirrorPodUID] = kubetypes.ResolvedPodUID(p.UID)
			}

正常的pod更新缓存

			resolvedPodUID := kubetypes.ResolvedPodUID(pod.UID)
			updateMetrics(pm.podByUID[resolvedPodUID], pod)
			pm.podByUID[resolvedPodUID] = pod
			pm.podByFullName[podFullName] = pod
			if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok {
				pm.translationByUID[kubetypes.MirrorPodUID(mirror.UID)] = resolvedPodUID
			}

UpdatePod更新pod

  • 可以看到底层也调用的是updatePodsInternal,和AddPod不同的是需要加锁,因为更新操作
func (pm *basicManager) UpdatePod(pod *v1.Pod) {
	pm.lock.Lock()
	defer pm.lock.Unlock()
	pm.updatePodsInternal(pod)
}

DeletePod 删除pod

  • 流程我们已经很熟悉了,就是调用 secretManager和 configMapManager注销相关的对象
  • 然后计算pod的key,从各个缓存map中删除即可

func (pm *basicManager) DeletePod(pod *v1.Pod) {
	updateMetrics(pod, nil)
	pm.lock.Lock()
	defer pm.lock.Unlock()
	if pm.secretManager != nil {
		pm.secretManager.UnregisterPod(pod)
	}
	if pm.configMapManager != nil {
		pm.configMapManager.UnregisterPod(pod)
	}
	podFullName := kubecontainer.GetPodFullName(pod)
	// It is safe to type convert here due to the IsMirrorPod guard.
	if kubetypes.IsMirrorPod(pod) {
		mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
		delete(pm.mirrorPodByUID, mirrorPodUID)
		delete(pm.mirrorPodByFullName, podFullName)
		delete(pm.translationByUID, mirrorPodUID)
	} else {
		delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
		delete(pm.podByFullName, podFullName)
	}
}

本节重点总结 : podManager的作用

  • podManager在kubelet中负责内存中pod及mirrorPod的维护
  • 任何在本节点上创建或删除操作都会同步更新podManager
  • 即可以认为podManager中存储了本节点上运行的pod的信息
  • 分析AddPod的源码

什么是mirror pod

  • mirrorpod主要与kubelet运行于standalone模式有关
  • 假如pod是通过file或http的形式获得的,这个pod被称为static pod,k8s会在集群中创建一个对应的mirror pod
  • 这时系统很难管理这部分pod;所以系统会在kubelet中创建一个static pod对应的mirrorPod,来表示static pod。

http://www.kler.cn/a/568207.html

相关文章:

  • 测试金蝶云的OpenAPI
  • 使用 Kubeflow 和 Ray 构建机器学习平台
  • 机器学习:监督学习、无监督学习和强化学习
  • 达梦数据库阻塞死锁及解锁
  • Excel工作圈小工具一个集合了大量Excel功能的绿色工具软件
  • C#中使用Newtonsoft.Json多态正反序列化
  • 基于 MetaGPT 自部署一个类似 MGX 的多智能体协作框架
  • 内容中台的企业CMS架构是什么?
  • 对话式AI引擎:DeepSeek技术引领多模态交互新篇章
  • RabbitMq延时队列的实现
  • 【漫话机器学习系列】107.线性组合(Linear Combination)
  • 【论文阅读笔记】SL-YOLO(2025/1/13) | 小目标检测 | HEPAN、C2fDCB轻量化模块
  • 检索增强生成(RAG)技术解析:大模型时代的“知识导航系统”
  • 基于Selenium的Python淘宝评论爬取教程
  • 【AI+智造】基于SKF IMAX-16+PT1000与Odoo18工业物联网架构智慧生产诊断系统集成方案
  • ubuntu 20.04 安装labelmg
  • C# Unity 唐老狮 No.1 模拟面试题
  • 【论文阅读笔记】FcaNet: Frequency Channel Attention Networks(2021/7/23)
  • Deepseek开源周第四天:从 DualPipe 到 EPLB
  • 查找Excel包含关键字的行(の几种简单快速方法)