8.5 kubelet维护pod的内存管理器podManager源码解析
本节重点总结 : podManager的作用
- podManager在kubelet中负责内存中pod及mirrorPod的维护
- 任何在本节点上创建或删除操作都会同步更新podManager
- 即可以认为podManager中存储了本节点上运行的pod的信息
- 分析AddPod的源码
什么是mirror pod
- mirrorpod主要与kubelet运行于standalone模式有关
- 假如pod是通过file或http的形式获得的,这个pod被称为static pod,k8s会在集群中创建一个对应的mirror pod
- 这时系统很难管理这部分pod;所以系统会在kubelet中创建一个static pod对应的mirrorPod,来表示static pod。
podManger的接口方法
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\pod\pod_manager.go
核心方法说明
type Manager interface {
// GetPods 获取属于这个kubelet的pods
GetPods() []*v1.Pod
// GetPodByFullName 根据pod的全名: namespace+name 返回pod
GetPodByFullName(podFullName string) (*v1.Pod, bool)
// GetPodByName 根据pod的name 返回pod
GetPodByName(namespace, name string) (*v1.Pod, bool)
// GetPodByUID 根据pod的UID 返回pod
GetPodByUID(types.UID) (*v1.Pod, bool)
// GetPodByMirrorPod 返回mirror pod
GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
// GetMirrorPodByPod returns the mirror pod for the given static pod and
// whether it was known to the pod manager.
GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
// GetPodsAndMirrorPods returns the both regular and mirror pods.
GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
SetPods(pods []*v1.Pod)
// AddPod 添加新的pod到manager中
AddPod(pod *v1.Pod)
// UpdatePod updates the given pod in the manager.
UpdatePod(pod *v1.Pod)
// DeletePod deletes the given pod from the manager. For mirror pods,
// this means deleting the mappings related to mirror pods. For non-
// mirror pods, this means deleting from indexes for all non-mirror pods.
DeletePod(pod *v1.Pod)
// GetOrphanedMirrorPodNames returns names of orphaned mirror pods
GetOrphanedMirrorPodNames() []string
// TranslatePodUID returns the actual UID of a pod. If the UID belongs to
// a mirror pod, returns the UID of its static pod. Otherwise, returns the
// original UID.
//
// All public-facing functions should perform this translation for UIDs
// because user may provide a mirror pod UID, which is not recognized by
// internal Kubelet functions.
TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
// GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
// UIDs and mirror pod UIDs to static pod UIDs.
GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
// IsMirrorPodOf returns true if mirrorPod is a correct representation of
// pod; false otherwise.
IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool
MirrorClient
}
podManager的底层数据接口
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\pod\pod_manager.go
// AddPod, UpdatePod, or DeletePod.
type basicManager struct {
// Protects all internal maps.
lock sync.RWMutex
// Regular pods indexed by UID.
podByUID map[kubetypes.ResolvedPodUID]*v1.Pod
// Mirror pods indexed by UID.
mirrorPodByUID map[kubetypes.MirrorPodUID]*v1.Pod
// Pods indexed by full name for easy access.
podByFullName map[string]*v1.Pod
mirrorPodByFullName map[string]*v1.Pod
// Mirror pod UID to pod UID map.
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
// basicManager is keeping secretManager and configMapManager up-to-date.
secretManager secret.Manager
configMapManager configmap.Manager
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
字段说明
- podByUID: 记录uid和pod的map关系;
- mirrorPodByUID: 记录mirrorPod的uid和mirrorPod的map关系;
- podByFullName: 记录fullName和pod的map关系;
- mirrorPodByFullName: 记录fullName和mirrorPod的map关系;
- translationByUID: 记录mirrorPod的uid和pod的uid的map关系;
- MirrorClient: 可以管理mirrorPod。
AddPod逻辑
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\pod\pod_manager.go
- 可以看到AddPod底层调用的是 UpdatePod
func (pm *basicManager) AddPod(pod *v1.Pod) {
pm.UpdatePod(pod)
}
func (pm *basicManager) UpdatePod(pod *v1.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.updatePodsInternal(pod)
}
updatePodsInternal解析
- 遍历传入的pod,判断pod状态,如果是Terminated就调用secretManager注销相关的secret
- Terminated代表pod状态为PodFailed或者是PodSucceeded
for _, pod := range pods {
if pm.secretManager != nil {
if isPodInTerminatedState(pod) {
// Pods that are in terminated state and no longer running can be
// ignored as they no longer require access to secrets.
// It is especially important in watch-based manager, to avoid
// unnecessary watches for terminated pods waiting for GC.
pm.secretManager.UnregisterPod(pod)
} else {
// TODO: Consider detecting only status update and in such case do
// not register pod, as it doesn't really matter.
pm.secretManager.RegisterPod(pod)
}
}
- 底层调用cacheBasedManager的UnregisterPod方法,位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\util\manager\cache_based_manager.go
- 对应就是从cache的store中将pod的secret删掉
func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
var prev *v1.Pod
key := objectKey{namespace: pod.Namespace, name: pod.Name}
c.lock.Lock()
defer c.lock.Unlock()
prev = c.registeredPods[key]
delete(c.registeredPods, key)
if prev != nil {
for name := range c.getReferencedObjects(prev) {
c.objectStore.DeleteReference(prev.Namespace, name)
}
}
}
podManager的secretManager和configMapManager解析
- 初始化的入口NewMainKubelet中, 在 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\kubelet.go
- 默认的ConfigMapAndSecretChangeDetectionStrategy策略为 Watch,所以使用NewWatchingSecretManager初始化
var secretManager secret.Manager
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
NewWatchingSecretManager解析
- 分别构造listSecret 、watchSecret、newSecret方法
func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
}
watchSecret := func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
return kubeClient.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
}
newSecret := func() runtime.Object {
return &v1.Secret{}
}
isImmutable := func(object runtime.Object) bool {
if secret, ok := object.(*v1.Secret); ok {
return secret.Immutable != nil && *secret.Immutable
}
return false
}
gr := corev1.Resource("secret")
return &secretManager{
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames),
}
}
- 底层创建的是objectCache
type objectCache struct {
listObject listObjectFunc
watchObject watchObjectFunc
newObject newObjectFunc
isImmutable isImmutableFunc
groupResource schema.GroupResource
clock clock.Clock
maxIdleTime time.Duration
lock sync.RWMutex
items map[objectKey]*objectCacheItem
}
- 传入的getSecretNames代表拿到pod对象之后去secret cache中查询,将pod对应的Secret全部找到
func getSecretNames(pod *v1.Pod) sets.String {
result := sets.NewString()
podutil.VisitPodSecretNames(pod, func(name string) bool {
result.Insert(name)
return true
})
return result
}
回到secretManager 注册和注销pod secret的地方
通用注册方法 RegisterPod
- 调用 getReferencedObjects也就是getSecretNames获取pod的所有secret名字得到names,遍历names添加到objectStore中
names := c.getReferencedObjects(pod)
c.lock.Lock()
defer c.lock.Unlock()
for name := range names {
c.objectStore.AddReference(pod.Namespace, name)
}
- 同时以namespace+name为key,更新一下registeredPods的map
var prev *v1.Pod
key := objectKey{namespace: pod.Namespace, name: pod.Name}
prev = c.registeredPods[key]
c.registeredPods[key] = pod
- 用如果之前存在这个key的对象prev,就在objectStore删除,等于增加新的删除旧的,避免update
通用的注销方法
- 计算key然后删除registeredPods和objectStore的内容
func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
var prev *v1.Pod
key := objectKey{namespace: pod.Namespace, name: pod.Name}
c.lock.Lock()
defer c.lock.Unlock()
prev = c.registeredPods[key]
delete(c.registeredPods, key)
if prev != nil {
for name := range c.getReferencedObjects(prev) {
c.objectStore.DeleteReference(prev.Namespace, name)
}
}
}
回到updatePodsInternal入口这里
- 如果pod 是Terminated状态就 用configMapManager和secretManager注销configMap和secret
- 如果pod 非Terminated状态就 用configMapManager和secretManager注册configMap和secret
- configMapManager和secretManager底层都是cacheBasedManager享有通用的注册/注销方法
判断 pod是否是mirror pod
- 如果是的话在pm的mirror缓存字段中更新值
if kubetypes.IsMirrorPod(pod) {
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
pm.mirrorPodByUID[mirrorPodUID] = pod
pm.mirrorPodByFullName[podFullName] = pod
if p, ok := pm.podByFullName[podFullName]; ok {
pm.translationByUID[mirrorPodUID] = kubetypes.ResolvedPodUID(p.UID)
}
正常的pod更新缓存
resolvedPodUID := kubetypes.ResolvedPodUID(pod.UID)
updateMetrics(pm.podByUID[resolvedPodUID], pod)
pm.podByUID[resolvedPodUID] = pod
pm.podByFullName[podFullName] = pod
if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok {
pm.translationByUID[kubetypes.MirrorPodUID(mirror.UID)] = resolvedPodUID
}
UpdatePod更新pod
- 可以看到底层也调用的是updatePodsInternal,和AddPod不同的是需要加锁,因为更新操作
func (pm *basicManager) UpdatePod(pod *v1.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.updatePodsInternal(pod)
}
DeletePod 删除pod
- 流程我们已经很熟悉了,就是调用 secretManager和 configMapManager注销相关的对象
- 然后计算pod的key,从各个缓存map中删除即可
func (pm *basicManager) DeletePod(pod *v1.Pod) {
updateMetrics(pod, nil)
pm.lock.Lock()
defer pm.lock.Unlock()
if pm.secretManager != nil {
pm.secretManager.UnregisterPod(pod)
}
if pm.configMapManager != nil {
pm.configMapManager.UnregisterPod(pod)
}
podFullName := kubecontainer.GetPodFullName(pod)
// It is safe to type convert here due to the IsMirrorPod guard.
if kubetypes.IsMirrorPod(pod) {
mirrorPodUID := kubetypes.MirrorPodUID(pod.UID)
delete(pm.mirrorPodByUID, mirrorPodUID)
delete(pm.mirrorPodByFullName, podFullName)
delete(pm.translationByUID, mirrorPodUID)
} else {
delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
delete(pm.podByFullName, podFullName)
}
}
本节重点总结 : podManager的作用
- podManager在kubelet中负责内存中pod及mirrorPod的维护
- 任何在本节点上创建或删除操作都会同步更新podManager
- 即可以认为podManager中存储了本节点上运行的pod的信息
- 分析AddPod的源码
什么是mirror pod
- mirrorpod主要与kubelet运行于standalone模式有关
- 假如pod是通过file或http的形式获得的,这个pod被称为static pod,k8s会在集群中创建一个对应的mirror pod
- 这时系统很难管理这部分pod;所以系统会在kubelet中创建一个static pod对应的mirrorPod,来表示static pod。