kubelet组件的启动流程源码分析
概述
摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。
正文
kubelet的作用
这里对kubelet的作用做一个简单总结。
-
节点管理
-
节点的注册
-
节点状态更新
-
-
容器管理(pod生命周期管理)
-
监听apiserver的容器事件
-
容器的创建、删除(CRI)
-
容器的网络的创建与删除(CNI)
-
容器状态监控
-
容器的驱逐
-
-
监控
- cadvisor
- healthz
kubelet的原理
(图片来源于网络,如有侵权请联系作者)
如下 kubelet 内部组件结构图所示,Kubelet 由许多内部组件构成
Kubelet API
,包括 10250 端口的认证 API、4194 端口的 cAdvisor API、10255 端口的只读 API 以及 10248 端口的健康检查 APIsyncLoop
:从 API 或者 manifest 目录接收 Pod 更新,发送到 podWorkers 处理,大量使用 channel 处理来处理异步请求- 辅助的
manager
,如 cAdvisor、PLEG、Volume Manager 等,处理 syncLoop 以外的其他工作 CRI
:容器执行引擎接口,负责与 container runtime shim 通信- 容器执行引擎,如 dockershim、rkt 等
网络插件
,目前支持 CNI 和 kubenet
kubelet的启动参数
/usr/local/bin/kubelet
--address=<node-name>
# 指定 kubelet 与 apiserver 通信的端口
--port=10250
--healthz-bind-address=0.0.0.0
# 指定 只读api 的端口
--read-only-port=10255
# 注册 node 节点使用的hostname
--hostname_override=<node-name>
# 重要!!! 指定kubeletconfig配置文件的路径
--config=/home/kube/kubernetes/conf/config.yaml
# 指定 pause 容器的image下载路径
--pod-infra-container-image=mirrors.myoas.com/nebula-docker/seg/pod/pause:3.1
# 指定 kubelet 在节点上存储数据和文件的根目录。
--root-dir=/home/kube/kubernetes/lib/kubelet
# 每个宿主最大能创建多少个POD
--max-pods=60
# 指定日志输出记录级别,4表示记录含有调试信息的所有信息
--v=4
# 指定cni插件
--network-plugin=cni
--cni-conf-dir=/etc/cni/net.d
--cni-bin-dir=/opt/cni/bin
# 指定了 kubelet 从 Kubernetes API Server 同步更新的时间间隔(以秒为单位),默认是60秒
--sync-frequency=5s
# 指定kubeconfig的路径,用于节点注册的认证
--kubeconfig=/home/kube/kubernetes/conf/kubelet.kubeconfig
# 存放认证文件的目录
--cert-dir=/home/kube/ssl/pkc
# 设置系统保留资源
--system-reserved=cpu=2000m,memory=20000Mi
# 支持宿主节点使用swap
--fail-swap-on=false
kubelet监听的端口
kubelet 默认监听三个端口,分别为 10250 、10255、10248(有些k8s版本的kubelet也包括cadvisor的4194端口)
-
10250: kubelet server 与 apiserver 通信的端口,定期请求 apiserver 获取自己所应当处理的任务,通过该端口可以访问获取 node 资源以及状态。
-
10255: 提供了 pod 、 node、metric和cadvisor 的信息,接口以只读形式暴露出去,访问该端口不需要认证和鉴权。
root@ubuntu:~# curl http://10.234.12.78:10255/stats/summary { "node": { "nodeName": "10.234.12.78", "systemContainers": [ { "name": "pods", "startTime": "2024-09-07T06:00:48Z", "cpu": { "time": "2024-09-07T11:49:09Z", "usageNanoCores": 12571621, "usageCoreNanoSeconds": 261530338504 }, // 输出略 }
root@ubuntu:~# curl http://10.234.12.77:10255/metrics |head -10
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0# HELP apiserver_audit_event_total [ALPHA] Counter of audit events generated and sent to the audit backend.
# TYPE apiserver_audit_event_total counter
apiserver_audit_event_total 0
# HELP apiserver_audit_requests_rejected_total [ALPHA] Counter of apiserver requests rejected due to an error in audit logging backend.
# TYPE apiserver_audit_requests_rejected_total counter
apiserver_audit_requests_rejected_total 0
# HELP apiserver_client_certificate_expiration_seconds [ALPHA] Distribution of the remaining lifetime on the certificate used to authenticate a request.
# TYPE apiserver_client_certificate_expiration_seconds histogram
apiserver_client_certificate_expiration_seconds_bucket{le="0"} 0
apiserver_client_certificate_expiration_seconds_bucket{le="1800"} 0
// 输出略
root@ubuntu:~# curl http://10.234.12.77:4194/metrics |head -10
# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="511ec9ef",cadvisorVersion="v0.33.0",dockerVersion="18.09.7",kernelVersion="4.15.0-147-generic",osVersion="Alpine Linux v3.8"} 1
# HELP container_cpu_cfs_periods_total Number of elapsed enforcement period intervals.
# TYPE container_cpu_cfs_periods_total counter
container_cpu_cfs_periods_total{container_label_annotation_io_kubernetes_container_hash="",container_label_annotation_io_kubernetes_container_ports="",container_label_annotation_io_kubernetes_container_restartCount="",container_label_annotation_io_kubernetes_container_terminationMessagePath="",container_label_annotation_io_kubernetes_container_terminationMessagePolicy="",container_label_annotation_io_kubernetes_pod_terminationGracePeriod="",container_label_annotation_kubernetes_io_config_seen="",container_label_annotation_kubernetes_io_config_source="",container_label_app="",container_label_controller_revision_hash="",container_label_io_kubernetes_container_logpath="",container_label_io_kubernetes_container_name="",container_label_io_kubernetes_docker_type="",container_label_io_kubernetes_pod_name="",container_label_io_kubernetes_pod_namespace="",container_label_io_kubernetes_pod_uid="",container_label_io_kubernetes_sandbox_id="",container_label_pod_template_generation="",id="/kubepods/burstable/podc387ad42-a56c-4e29-bc78-0264f44614b7",image="",name=""} 212064 1725710124084
// 输出略
-
10248: 通过访问该端口可以判断 kubelet 是否正常工作, 通过 kubelet 的启动参数
--healthz-port
和--healthz-bind-address
来指定监听的地址和端口。root@ubuntu:~# curl http://10.234.12.78:10248/healthz ok
查询 Node 汇总信息
- 在集群内部可以直接访问 kubelet 的 10255 端口
curl http://<node-name>:10255/stats/summary
- 查询kubelet健康状态
curl http://<node-name>:10248/healthz
- 查询kubelet的metrics
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics
- 查询cadvisor
curl http://<master>:58201/api/v1/nodes/<node-name>/proxy/metrics/cadvisor
pod创建流程
(图片来源于网络,如有侵权请联系作者)
上图是一个典型的pod创建流程图,kubelet通过syncloop 监听到 apiserver 将 pod调度到本机上,之后kubelet通过与dockershim交互,创建container,准备cni,准备image.创建完成container后,kubelet将container的状态信息反馈给apiserver.
kubelet启动源码解析
说明:基于 kubernetes
v1.18.0
源码分析
再对kubelet的基础知识有一定了解后,我们下面正式进入kubelet启动流程的源码分析。源码位于k8s.io/kubernetes/cmd/kubelet/kubelet.go
kubeletconfig
在进行源码分析之前,我们分析Kubelet使用的配置文件kubeletconfig,kubeletconfig包括了kubelet程序运行的重要参数信息。
查看kubeletconfig的内容
- 方法一: 直接访问
root@ubuntu:~# curl -X GET https://10.234.12.78:10250/configz -k |jq
方法二:通过kubectl proxy 间接访问
root@ubuntu:~# kubectl proxy
Starting to serve on 127.0.0.1:8001
查看kubeletconfig的内容
curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .
kubeletconfig的内容如下
root@ubuntu:~# curl -X GET http://127.0.0.1:8001/api/v1/nodes/<node-name>/proxy/configz | jq .
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1985 100 1985 0 0 351k 0 --:--:-- --:--:-- --:--:-- 387k
{
"kubeletconfig": {
"staticPodPath": "/etc/kubernetes/manifests",
"syncFrequency": "5s",
"fileCheckFrequency": "20s",
"httpCheckFrequency": "20s",
"address": "<node-name>",
"port": 10250,
"tlsCertFile": "/home/kube/ssl/pkc/kubelet.crt",
"tlsPrivateKeyFile": "/home/kube/ssl/pkc/kubelet.key",
"rotateCertificates": true,
"authentication": {
"x509": {
"clientCAFile": "/home/kube/ssl/pkc/ca.crt"
},
"webhook": {
"enabled": true,
"cacheTTL": "2m0s"
},
"anonymous": {
"enabled": true
}
},
"authorization": {
"mode": "Webhook",
"webhook": {
"cacheAuthorizedTTL": "5m0s",
"cacheUnauthorizedTTL": "30s"
}
},
"registryPullQPS": 5,
"registryBurst": 10,
"eventRecordQPS": 5,
"eventBurst": 10,
"enableDebuggingHandlers": true,
"healthzPort": 10248,
"healthzBindAddress": "0.0.0.0",
"oomScoreAdj": -999,
"clusterDomain": "cluster.local",
"clusterDNS": [
"10.96.0.10"
],
"streamingConnectionIdleTimeout": "4h0m0s",
"nodeStatusUpdateFrequency": "10s",
"nodeStatusReportFrequency": "1m0s",
"nodeLeaseDurationSeconds": 40,
"imageMinimumGCAge": "2m0s",
"imageGCHighThresholdPercent": 85,
"imageGCLowThresholdPercent": 80,
"volumeStatsAggPeriod": "1m0s",
"cgroupsPerQOS": true,
"cgroupDriver": "cgroupfs",
"cpuManagerPolicy": "none",
"cpuManagerReconcilePeriod": "10s",
"topologyManagerPolicy": "none",
"runtimeRequestTimeout": "2m0s",
"hairpinMode": "promiscuous-bridge",
"maxPods": 60,
"podPidsLimit": -1,
"resolvConf": "/etc/resolv.conf",
"cpuCFSQuota": true,
"cpuCFSQuotaPeriod": "100ms",
"maxOpenFiles": 1000000,
"contentType": "application/vnd.kubernetes.protobuf",
"kubeAPIQPS": 5,
"kubeAPIBurst": 10,
"serializeImagePulls": true,
"evictionHard": {
"imagefs.available": "15%",
"memory.available": "100Mi",
"nodefs.available": "10%",
"nodefs.inodesFree": "5%"
},
"evictionPressureTransitionPeriod": "5m0s",
"enableControllerAttachDetach": true,
"makeIPTablesUtilChains": true,
"iptablesMasqueradeBit": 14,
"iptablesDropBit": 15,
"failSwapOn": false,
"containerLogMaxSize": "10Mi",
"containerLogMaxFiles": 5,
"configMapAndSecretChangeDetectionStrategy": "Watch",
"systemReserved": {
"cpu": "2000m",
"memory": "20000Mi"
},
"enforceNodeAllocatable": [
"pods"
]
}
}
main
kubelet组件的启动入口main(),在kubernetes/cmd/kubelet/kubelet.go
中
kubelet与kubernetes其他组件一样还是使用corba框架,进行命令行参数解析。
func main() {
rand.Seed(time.Now().UnixNano())
// 调用 app.NewKubeletCommand()
command := app.NewKubeletCommand()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
NewKubeletCommand
NewKubeletCommand 的执行逻辑包括:
-
从命令行参数与
kubeletconfig
中,读取参数 -
参数校验,配置文件校验
-
初始化默认的 featureGate 配置。
具体有哪些featuregate可以参考kubernete官方文档feature-gates
-
使用命令行参数和配置文件的参数,构建
KubeletServer
-
用
KubeletServer
来 构建kubeletDeps
,kubeletDeps
包含 kubelet 运行所必须的配置 -
将前面创建的
featureGate
,KubeletServer
,kubeletDeps
传入Run()函数,进行启动kubelet
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 从命令行参数中读取配置
kubeletFlags := options.NewKubeletFlags()
// 从 Kubeletconfig 中读取配置, Kubeletconfig配置文件由 --config=xxx 指定
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.Fatal(err)
}
cmd := &cobra.Command{
Use: componentKubelet,
Long: `The kubelet is the primary "node agent" that runs on each
node. It can register the node with the apiserver using one of: the hostname; `
DisableFlagParsing: true,
Run: func(cmd *cobra.Command, args []string) {
// initial flag parse, since we disable cobra's flag parsing
// Parse 解析命令行参数
if err := cleanFlagSet.Parse(args); err != nil {
cmd.Usage()
klog.Fatal(err)
}
// check if there are non-flag arguments in the command line
cmds := cleanFlagSet.Args()
if len(cmds) > 0 {
cmd.Usage()
klog.Fatalf("unknown command: %s", cmds[0])
}
// short-circuit on help
help, err := cleanFlagSet.GetBool("help")
if err != nil {
klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
}
if help {
cmd.Help()
return
}
// short-circuit on verflag
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cleanFlagSet)
// set feature gates from initial flags-based config
// 初始化 featureGate 配置
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
// validate the initial KubeletFlags
// 校验命令行参数
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
klog.Fatal(err)
}
// pause 容器下载的路径必须指定。--pod-infra-container-image== 指定的pause容器的image下载路径
if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
}
// load kubelet config file, if provided
// 加载 kubeconfig 配置文件
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
if err != nil {
klog.Fatal(err)
}
// We must enforce flag precedence by re-parsing the command line into the new object.
// This is necessary to preserve backwards-compatibility across binary upgrades.
// See issue #56171 for more details.
if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
klog.Fatal(err)
}
// update feature gates based on new config
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
// We always validate the local configuration (command line + config file).
// This is the default "last-known-good" config for dynamic config, and must always remain valid.
// 校验 kubeletconfig 配置文件
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
klog.Fatal(err)
}
// use dynamic kubelet config, if enabled
// 处理动态配置
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
// construct a KubeletServer from kubeletFlags and kubeletConfig
// 用上面的命令行参数与kubeletconfig来构建一个 KubeletServer 对象
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// use kubeletServer to construct the default KubeletDeps
// 使用 kubeletServer 构建一个的 kubeletDeps, kubeletDeps是 kubelet启动所依赖的条件
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
klog.Fatal(err)
}
// add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController
// set up stopCh here in order to be reused by kubelet and docker shim
stopCh := genericapiserver.SetupSignalHandler()
// start the experimental docker shim, if enabled
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
klog.Fatal(err)
}
return
}
// run the kubelet
// 启动 kubelet
klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
klog.Fatal(err)
}
},
}
// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
kubeletFlags.AddFlags(cleanFlagSet)
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
options.AddGlobalFlags(cleanFlagSet)
cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
})
return cmd
}
Run
Run使用给定的Dependencies,来运行指定的KubeletServer,而且它永远不应该退出。kubeDeps参数可以是nil,如果是nil的话,它将从KubeletServer上的设置初始化。否则,将假定调用者已经设置了Dependencies对象,并且不会生成默认对象。
从源码可以看到,Run对OS类型是windows时做一些处理,之后就马上进入run方法
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
// 如果 OS 是 Windows 时,这做一些处理
if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
// 调用run方法进一步处理
if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}
run
run() 函数很长,总结下它的做的工作包括:
- 为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于
Alpha
状态的 FeatureGates 在组件启动时默认关闭,处于Beta
和 GA 状态的默认开启; - 校验 kubelet 的参数;
- 将当前的配置文件注册到 http server
/configz
URL 中;这样就可以通过curl -X GET https://127.0.0.1:10250/configz -k
查看kubeletconfig的配置信息 - 检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
- 初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有
KubeClient
、EventClient
、HeartbeatClient
、Auth
、cadvisor
、ContainerManager
; - 配置cgroupRoot目录。通过参数指定 --cgroup-root=。kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。
- 检查是否以 root 用户启动;
- 为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
- 调用
RunKubelet
方法; - 检查 kubelet 是否启动了动态配置功能;
- 启动 Healthz http server;默认是10248端口
- 如果使用 systemd 启动,通知 systemd kubelet 已经启动;
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
// 设置 全局的 feature
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
// 对初始化的 KubeletServer 进行校验
if err := options.ValidateKubeletServer(s); err != nil {
return err
}
// Obtain Kubelet Lock File
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
klog.Infof("acquiring file lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
klog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}
// Register current configuration with /configz endpoint
// 注册当前配置文到 /configz http访问点, 可以通过访问 ”curl -X GET https://<nodename>:10250/configz -k“
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}
if len(s.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
// About to get clients and such, detect standaloneMode
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}
// 初始化 kubeDeps
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s, featureGate)
if err != nil {
return err
}
}
if kubeDeps.Cloud == nil {
if !cloudprovider.IsExternal(s.CloudProvider) {
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
if cloud == nil {
klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
} else {
klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
}
kubeDeps.Cloud = cloud
}
}
// 初始 hostName, nodeName
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
// if in standalone mode, indicate as much by setting all clients to nil
// 如果是 standalone 模式,则设置 KubeClient ,EventClient,HeartbeatClient 为空
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.Warningf("standalone mode, no API client")
// 初始化 kubeClient , EventClient, HearbeatClient
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
if err != nil {
return err
}
if closeAllConns == nil {
return errors.New("closeAllConns must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = closeAllConns
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %v", err)
}
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %v", err)
}
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// The timeout is the minimum of the lease duration and status update frequency
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
heartbeatClientConfig.QPS = float32(-1)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
}
}
// 出还是 auth 模块
if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(stopCh)
}
// 这种 cgroupRoot (linux通过cgroup现在容器的资源使用,cgroupRoot就是 cgroup 对应的根目录,默认是/sys/fs/cgroup/systemd/ 目录)
// kubeletCgroup 用于指定 kubelet 进程自身的 cgroup 目录。
// runtimeCgroup 用于指定 kubelet 所管理的所有容器的 cgroup 目录。 --cgroup-root=/my/cgroup/path
// SystemCgroups 用于指定 kubelet 挂载 cgroup 文件系统时的根目录。
// 这些配置项允许对 kubelet 自身和所管理的容器在 cgroup 层面进行资源限制和隔离。它们可以根据需求进行自定义配置,以满足特定的部署要求。
var cgroupRoots []string
// 在 systemd 就是一个 cgroupDriver 的一种
cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
} else if runtimeCgroup != "" {
// RuntimeCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, runtimeCgroup)
}
if s.SystemCgroups != "" {
// SystemCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}
// 配置 cavdisor
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}
// Setup event recorder if required.
// 配置 事件记录器
makeEventRecorder(kubeDeps, nodeName)
// 初始化 ContainerManager
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
var reservedSystemCPUs cpuset.CPUSet
var errParse error
if s.ReservedSystemCPUs != "" {
reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
if errParse != nil {
// invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
return errParse
}
// is it safe do use CAdvisor here ??
machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
if err != nil {
// if can't use CAdvisor here, fall back to non-explicit cpu list behavor
klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
reservedSystemCPUs = cpuset.NewCPUSet()
} else {
reservedList := reservedSystemCPUs.ToSlice()
first := reservedList[0]
last := reservedList[len(reservedList)-1]
if first < 0 || last >= machineInfo.NumCores {
// the specified cpuset is outside of the range of what the machine has
klog.Infof("Invalid cpuset specified by --reserved-cpus")
return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
}
}
} else {
reservedSystemCPUs = cpuset.NewCPUSet()
}
if reservedSystemCPUs.Size() > 0 {
// at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
if s.SystemReserved == nil {
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
ContainerRuntime: s.ContainerRuntime,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
ReservedSystemCPUs: reservedSystemCPUs,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
}
// 检查是否以 root 权限启动
if err := checkPermissions(); err != nil {
klog.Error(err)
}
utilruntime.ReallyCrash = s.ReallyCrashForTesting
// TODO(vmarmol): Do this through container config.
// 配置 OOMScoreAdj 分数
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.Warning(err)
}
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
kubeDeps, &s.ContainerRuntimeOptions,
s.ContainerRuntime,
s.RuntimeCgroups,
s.RemoteRuntimeEndpoint,
s.RemoteImageEndpoint,
s.NonMasqueradeCIDR)
if err != nil {
return err
}
// 调用 RunKubelet 方法执行后续的启动操作
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
return err
}
}
// 启动 healthz 监控检查的http端口
if s.HealthzPort > 0 {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.Errorf("Starting healthz server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}
if s.RunOnce {
return nil
}
// If systemd is used, notify it that we have started
// 如果使用的是systemd,则向 systemd 发送启动ready 的信号
go daemon.SdNotify(false, "READY=1")
select {
case <-done:
break
case <-stopCh:
break
}
return nil
}
RunKubelet
run函数调用 RunKubelet 方法执行后续的启动操作,RunKubelet的工作包括:
- 设置默认启动特性模式
- 调用 createAndInitKubelet ,执行 kubelet 组件的初始化
- 检查 kubeDeps.PodConfig. kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,以支持 Kubernetes 中的容器管理和调度任务
- 设置 MaxOpenFiles 。 可以通过kubelet启动参数进行调整。–max-open-files=
- 调用 startKubelet,启动 kubelet 中的组件
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
}
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)
// 1.默认使用特权模式
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: true,
})
credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}
// 2. 调用 createAndInitKubelet
k, err := createAndInitKubelet(.....)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
// 3. 检查 kubeDeps.PodConfig
// kubeDeps.PodConfig 在 kubelet 中起到管理和存储单个 Pod 配置的作用,为 kubelet 提供了访问和操作 Pod 配置信息的便利性,
// 以支持 Kubernetes 中的容器管理和调度任务
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig
// 4. 设置 MaxOpenFiles
rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
// 5. 调用 startKubelet
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}
createAndInitKubelet
createAndInitKubelet
中主要调用了三个方法来完成 kubelet 的初始化:
kubelet.NewMainKubelet
函数使用提供的配置参数创建一个新的kubelet实例。它接收一些输入参数,例如节点名称、配置对象、主节点的网络地址等,以及其他一些选项。它会初始化kubelet的各种子系统和依赖项,并返回一个指向kubelet实例的指针k.BirthCry()
向 apiserver 发送一条 kubelet 启动了的 event;k.StartGarbageCollection()
启动 垃圾回收,回收 container 和 images;
func createAndInitKubelet(....) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// 实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;
k, err = kubelet.NewMainKubelet(....)
if err != nil {
return nil, err
}
// 向 apiserver 发送一条 kubelet 启动了的 event;
k.BirthCry()
// 启动 垃圾回收,回收 container 和 images;
k.StartGarbageCollection()
return k, nil
}
startKubelet
k.Run(()
,启动 kubelet 中的所有模块以及主流程- 启动 kubelet server,默认http监听端口是10250
- 启动只读状态http服务,默认是10255
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
// start the kubelet
// 启动 kubelet 中的所有模块以及主流程
go k.Run(podCfg.Updates())
// start the kubelet server
// 启动 kubelet server,默认http监听端口是10250
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}
// 只读状态api,默认是10255
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
// 启动DefaultFeatureGate中对应http服务的相关端口
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
Kubelet.Run
Run
方法是启动 kubelet 的核心方法,其中会启动 kubelet 的依赖模块以及主循环逻辑,该方法的主要逻辑为:
- 1、注册 logServer;
- 2、判断是否需要启动 cloud provider sync manager;
- 3、调用
kl.initializeModules
首先启动不依赖 container runtime 的一些模块; - 4、启动
volume manager
; - 5、执行
kl.syncNodeStatus
定时同步 Node 状态; - 6、调用
kl.fastStatusUpdateOnce
更新容器运行时启动时间以及执行首次状态同步; - 7、判断是否启用
NodeLease
机制; - 8、执行
kl.updateRuntimeUp
定时更新 Runtime 状态; - 9、执行
kl.syncNetworkUtil
定时同步 iptables 规则; - 10、执行
kl.podKiller
定时清理异常 pod,当 pod 没有被 podworker 正确处理的时候,启动一个goroutine 负责 kill 掉 pod; - 11、启动
statusManager
; - 12、启动
probeManager
; - 13、启动
runtimeClassManager
; - 14、启动
pleg
; - 15、调用
kl.syncLoop
监听 pod 变化;
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// 配置 logServer
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
// 检查 kubeClient
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}
// Start the cloud provider sync manager
// 启动 cloudResourceSyncManager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
// 在initializeModules()中会启动 imageManager,serverCertificateManager,oomWatcher,resourceAnalyzer
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}
// Start volume manager
// 启动 volumeManager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
// 同步 node 信息
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
// 调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步
go kl.fastStatusUpdateOnce()
// start syncing lease
// 启动 nodeLease控制器,nodeLease是一种节点健康检查机制
go kl.nodeLeaseController.Run(wait.NeverStop)
}
// 执行 kl.updateRuntimeUp 定时更新 Runtime 状态,如果runtime (例如docker) 状态检查返回false,kubelet 将处于not ready状态
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
// 同步 iptables 规则
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
// 定时清理异常 pod
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
// Start component sync loops.
// 启动 statusManager、probeManager、runtimeClassManager
kl.statusManager.Start()
kl.probeManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
// 启动 pleg 即Pod生命周期实际管理器
kl.pleg.Start()
// 调用 kl.syncLoop 监听 pod 变化
kl.syncLoop(updates, kl)
}
nodeLease 机制是 kubelet 与控制平面交互的机制之一,用于保持节点的活性以及更新节点的状态。是Kubernetes 引入了节点健康检查机制。nodeLease 机制允许 kubelet 周期性地向控制平面报告自己的“租赁”,以证明它仍然在运行。如果 kubelet 停止发送此报告,则控制平面将认为该节点已离线,并在控制平面上更新该节点的状态为“离线”。
initializeModules
initializeModules
函数负责初始化kubelet的各个子系统和功能模块,以便kubelet能够正常运行和管理容器化工作负载。具体而言,它的工作包括:
-
1、创建kubelet工作的文件目录,由参数–root-dir指定
-
2、创建 ContainerLogsDir
-
3、启动 imageManager
-
4、启动 certificate manager
-
5、启动 oomWatcher.
-
6、启动 resource analyzer
func (kl *Kubelet) initializeModules() error {
metrics.Register(
kl.runtimeCache,
collectors.NewVolumeStatsCollector(kl),
collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
)
metrics.SetNodeName(kl.nodeName)
servermetrics.Register()
// 1、创建文件目录,由参数--root-dir指定
if err := kl.setupDataDirs(); err != nil {
return err
}
// 2、创建 ContainerLogsDir
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
}
}
// 3、启动 imageManager
kl.imageManager.Start()
// 4、启动 certificate manager
if kl.serverCertificateManager != nil {
kl.serverCertificateManager.Start()
}
// 5、启动 oomWatcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("failed to start OOM watcher %v", err)
}
// 6、启动 resource analyzer
kl.resourceAnalyzer.Start()
return nil
}
Kubelet.synloop
syncLoop是处理pod变化的主循环。它监视来自三个通道(file、apisserver和http)的更改,并创建它们的联合。如果发现到任何新更改,将针对期望状态和运行状态运行同步处理。如果配置没有变化,将每同步频率秒同步最后一个已知的所需状态。
Kubelet.synloop 启动for{}循环调用Kubelet.syncLoopIteration
对应监听的通道的变化事件进行处理
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.Info("Starting kubelet main sync loop.")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
// 循环处理 syncLoopIteration
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.Errorf("skipping pod synchronization - %v", err)
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
Kubelet.syncLoopIteration
kubelet 的 pods 同步逻辑都在 syncLoopIteration
这里. syncLoopIteration
同时监听下面的 chan, 根据事件做不同的处理.
- configCh: 监听 file, http, apiserver 的事件更新
- syncCh: 定时器管道, 每隔一秒去同步最新保存的 pod 状态
- houseKeepingCh: housekeeping 事件的管道,做 pod 清理工作
- plegCh: 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
- livenessManager.Updates: 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
// 监听 file, http, apiserver 的事件更新
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
}
if u.Op != kubetypes.RESTORE {
// If the update type is RESTORE, it means that the update is from
// the pod checkpoints and may be incomplete. Do not mark the
// source as ready.
// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
}
// 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态.
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
// 每隔一秒去同步最新保存的 pod 状态
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
handler.HandlePodSyncs(podsToSync)
// 健康检查发现某个 pod 不可用, kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
// housekeeping 事件的管道,做 pod 清理工作
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
klog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}
kubelet工作原理示意图:
(图片来源于网络,如有侵权请联系作者)
此处总结一下 kubelet 启动逻辑中的调用关系如下所示:
|--> NewMainKubelet
|
|--> createAndInitKubelet --|--> BirthCry
| |
|--> RunKubelet --| |--> StartGarbageCollection
| |
| |--> startKubelet --> k.Run --> kl.syncLoop --> kl.syncLoopIteration
|
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe
|
|--> daemon.SdNotify
结论
本文主要介绍了 kublet的作用以及工作原理。之后从源码分析了kubelet 的启动流程,从分析过程中可以看到 kubelet 启动流程中的环节非常多,kubelet 中也包含了非常多的模块,如果要深入掌握kubelet的工作机制,则后面得对各个模板进行单独详细展开分析。
参考文档
https://feisky.gitbooks.io/kubernetes/content/components/kubelet.html
https://juejin.cn/post/6844903694618607623
https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kubelet_init.html