10.3 kubelet 中的cgroupManager解析和节点qos顶级目录创建
本节重点总结 :
- CgroupManager负责cgroup相关操作,支持cgroup的创建、删除和更新
- CgroupManager应用之一就是创建节点的顶级qos cgroup目录
- driver类型有 cgroupsfs 和systemd 两者都是对cgroup的封装
- systemd 提供了 cgroups 的使用和管理接口
CgroupManager的作用
- CgroupManager负责cgroup相关操作,支持cgroup的创建、删除和更新
CgroupManager 接口方法
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\types.go
// CgroupManager allows for cgroup management.
// Supports Cgroup Creation ,Deletion and Updates.
type CgroupManager interface {
// create 创建叶子cgroups,要求父级cgroup存在
Create(*CgroupConfig) error
// Destroy the cgroup.
Destroy(*CgroupConfig) error
// Update cgroup configuration.
Update(*CgroupConfig) error
// Exists checks if the cgroup already exists
Exists(name CgroupName) bool
// Name 将指定驱动程序的转换后返回主机上的文本cgroupfs名称
// 比如 systemd返回应该是foo.slice/foo-bar.slice
Name(name CgroupName) string
// CgroupName converts the literal cgroupfs name on the host to an internal identifier.
CgroupName(name string) CgroupName
// Pids 返回子cgroup的 所有pids
Pids(name CgroupName) []int
// ReduceCPULimits reduces the CPU CFS values to the minimum amount of shares.
ReduceCPULimits(cgroupName CgroupName) error
// MemoryUsage 就是内存使用情况
MemoryUsage(name CgroupName) (int64, error)
}
CgroupManager 结构体字段
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager_linux.go
type cgroupManagerImpl struct {
// subsystems holds information about all the
// mounted cgroup subsystems on the node
subsystems *CgroupSubsystems
// adapter 代表驱动类型 systemd 或者 cgroupfs
adapter *libcontainerAdapter
}
libcontainerAdapter 代表驱动类型 systemd 或者 cgroupfs
// libcontainerAdapter provides a simplified interface to libcontainer based on libcontainer type.
type libcontainerAdapter struct {
// cgroupManagerType defines how to interface with libcontainer
cgroupManagerType libcontainerCgroupManagerType
}
// libcontainerCgroupfs means use libcontainer with cgroupfs
libcontainerCgroupfs libcontainerCgroupManagerType = "cgroupfs"
// libcontainerSystemd means use libcontainer with systemd
libcontainerSystemd libcontainerCgroupManagerType = "systemd"
CgroupSubsystems代表 cgroup 子系统的信息缓存
// CgroupSubsystems holds information about the mounted cgroup subsystems
type CgroupSubsystems struct {
// Cgroup subsystem mounts.
// e.g.: "/sys/fs/cgroup/cpu" -> ["cpu", "cpuacct"]
Mounts []libcontainercgroups.Mount
// Cgroup subsystem to their mount location.
// e.g.: "cpu" -> "/sys/fs/cgroup/cpu"
MountPoints map[string]string
}
- MountPoints 代表cgroup 子系统名字 到path的缓存
- Mounts 代表path 到对应多个子系统的缓存
CgroupManager 的初始化
- 调用的入口在 kubelet的NewContainerManager中,位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager_linux.go
cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
入参分析
nodeConfig.CgroupDriver代表驱动的类型
- 对应的参数 为–cgroup-driver ,代表使用 cgroupfs还是 systemd驱动,它们都是对原生 cgroup的封装
- 我们查看kubelet的启动文件可以知道,使用的是 systemd
grep -i driver /var/lib/kubelet/config.yaml
cgroupDriver: systemd
subsystems代表 子系统信息
- 对应的由GetCgroupSubsystems根据 cgroups版本提供
// GetCgroupSubsystems returns information about the mounted cgroup subsystems
func GetCgroupSubsystems() (*CgroupSubsystems, error) {
if libcontainercgroups.IsCgroup2UnifiedMode() {
return getCgroupSubsystemsV2()
}
return getCgroupSubsystemsV1()
}
- 底层使用getCgroupSubsystemsV1获取 cgroups的 子系统名称
// getCgroupSubsystemsV1 returns information about the mounted cgroup v1 subsystems
func getCgroupSubsystemsV1() (*CgroupSubsystems, error) {
// get all cgroup mounts.
allCgroups, err := libcontainercgroups.GetCgroupMounts(true)
if err != nil {
return &CgroupSubsystems{}, err
}
if len(allCgroups) == 0 {
return &CgroupSubsystems{}, fmt.Errorf("failed to find cgroup mounts")
}
mountPoints := make(map[string]string, len(allCgroups))
for _, mount := range allCgroups {
// BEFORE kubelet used a random mount point per cgroups subsystem;
// NOW more deterministic: kubelet use mount point with shortest path;
// FUTURE is bright with clear expectation determined in doc.
// ref. issue: https://github.com/kubernetes/kubernetes/issues/95488
for _, subsystem := range mount.Subsystems {
previous := mountPoints[subsystem]
if previous == "" || len(mount.Mountpoint) < len(previous) {
mountPoints[subsystem] = mount.Mountpoint
}
}
}
return &CgroupSubsystems{
Mounts: allCgroups,
MountPoints: mountPoints,
}, nil
}
- 又通过getCgroupMountsV1获取到 cgroup的所有挂载点
func getCgroupMountsV1(all bool) ([]Mount, error) {
mi, err := readCgroupMountinfo()
if err != nil {
return nil, err
}
allSubsystems, err := ParseCgroupFile("/proc/self/cgroup")
if err != nil {
return nil, err
}
allMap := make(map[string]bool)
for s := range allSubsystems {
allMap[s] = false
}
return getCgroupMountsHelper(allMap, mi, all)
}
- 获取挂载点的过程如下
cat /proc/self/mountinfo |grep cgroup
25 18 0:21 / /sys/fs/cgroup ro,nosuid,nodev,noexec shared:9 - tmpfs tmpfs ro,mode=755
26 25 0:22 / /sys/fs/cgroup/systemd rw,nosuid,nodev,noexec,relatime shared:10 - cgroup cgroup rw,xattr,release_agent=/usr/lib/systemd/systemd-cgroups-agent,name=systemd
28 25 0:24 / /sys/fs/cgroup/freezer rw,nosuid,nodev,noexec,relatime shared:11 - cgroup cgroup rw,freezer
29 25 0:25 / /sys/fs/cgroup/memory rw,nosuid,nodev,noexec,relatime shared:12 - cgroup cgroup rw,memory
30 25 0:26 / /sys/fs/cgroup/blkio rw,nosuid,nodev,noexec,relatime shared:13 - cgroup cgroup rw,blkio
31 25 0:27 / /sys/fs/cgroup/net_cls,net_prio rw,nosuid,nodev,noexec,relatime shared:14 - cgroup cgroup rw,net_prio,net_cls
32 25 0:28 / /sys/fs/cgroup/cpuset rw,nosuid,nodev,noexec,relatime shared:15 - cgroup cgroup rw,cpuset
33 25 0:29 / /sys/fs/cgroup/devices rw,nosuid,nodev,noexec,relatime shared:16 - cgroup cgroup rw,devices
34 25 0:30 / /sys/fs/cgroup/hugetlb rw,nosuid,nodev,noexec,relatime shared:17 - cgroup cgroup rw,hugetlb
35 25 0:31 / /sys/fs/cgroup/perf_event rw,nosuid,nodev,noexec,relatime shared:18 - cgroup cgroup rw,perf_event
36 25 0:32 / /sys/fs/cgroup/cpu,cpuacct rw,nosuid,nodev,noexec,relatime shared:19 - cgroup cgroup rw,cpuacct,cpu
37 25 0:33 / /sys/fs/cgroup/pids rw,nosuid,nodev,noexec,relatime shared:20 - cgroup cgroup rw,pids
NewCgroupManager分析
- 将传入的 CgroupSubsystems 和cgroupDriver赋值 ,返回一个libcontainerAdapter 适配器
// NewCgroupManager is a factory method that returns a CgroupManager
func NewCgroupManager(cs *CgroupSubsystems, cgroupDriver string) CgroupManager {
managerType := libcontainerCgroupfs
if cgroupDriver == string(libcontainerSystemd) {
managerType = libcontainerSystemd
}
return &cgroupManagerImpl{
subsystems: cs,
adapter: newLibcontainerAdapter(managerType),
}
}
底层的newManager
// newManager returns an implementation of cgroups.Manager
func (l *libcontainerAdapter) newManager(cgroups *libcontainerconfigs.Cgroup, paths map[string]string) (libcontainercgroups.Manager, error) {
switch l.cgroupManagerType {
case libcontainerCgroupfs:
if libcontainercgroups.IsCgroup2UnifiedMode() {
return cgroupfs2.NewManager(cgroups, paths["memory"], false)
}
return cgroupfs.NewManager(cgroups, paths, false), nil
case libcontainerSystemd:
// this means you asked systemd to manage cgroups, but systemd was not on the host, so all you can do is panic...
if !cgroupsystemd.IsRunningSystemd() {
panic("systemd cgroup manager not available")
}
if libcontainercgroups.IsCgroup2UnifiedMode() {
return cgroupsystemd.NewUnifiedManager(cgroups, paths["memory"], false), nil
}
return cgroupsystemd.NewLegacyManager(cgroups, paths), nil
}
return nil, fmt.Errorf("invalid cgroup manager configuration")
}
- 会根据cgroupManagerType进行判断 ,如果是cgroupfs 就用cgroupfs.NewManager
- 对应源码在 D:\go_path\src\github.com\kubernetes\kubernetes\vendor\github.com\opencontainers\runc\libcontainer\cgroups\fs\fs.go
- 如果是systemd ,那么就是 systemd的
- 对应源码在 D:\go_path\src\github.com\kubernetes\kubernetes\vendor\github.com\opencontainers\runc\libcontainer\cgroups\systemd\v1.go
type legacyManager struct {
mu sync.Mutex
cgroups *configs.Cgroup
paths map[string]string
dbus *dbusConnManager
}
func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager {
return &legacyManager{
cgroups: cg,
paths: paths,
dbus: newDbusConnManager(false),
}
}
cgroupManager的应用
01 创建节点分配的cgroup时
- containerManager 启动时会初始化node,位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager_linux.go
func (cm *containerManagerImpl) Start(node *v1.Node,
activePods ActivePodsFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService) error {
// Setup the node
if err := cm.setupNode(activePods); err != nil {
return err
}
}
- 在setupNode中 ,会根据 --cgroups-per-qos 参数创建node cgroups
func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error {
// Setup top level qos containers only if CgroupsPerQOS flag is specified as true
if cm.NodeConfig.CgroupsPerQOS {
if err := cm.createNodeAllocatableCgroups(); err != nil {
return err
}
err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods)
if err != nil {
return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
}
}
}
-
–cgroups-per-qos 参数的意思是如果为true ,顶级的pod qos就会创建,默认为true
-
createNodeAllocatableCgroups 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\node_container_manager_linux.go
-
首先根据cadvisor获取节点的资源容量信息
nodeAllocatable := cm.internalCapacity
- internalCapacity是一个map,key是资源的名称,value是资源的容量,定义ResourceList 在D:\go_path\src\github.com\kubernetes\kubernetes\staging\src\k8s.io\api\core\v1\types.go
const (
// CPU, in cores. (500m = .5 cores)
ResourceCPU ResourceName = "cpu"
// Memory, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
ResourceMemory ResourceName = "memory"
// Volume size, in bytes (e,g. 5Gi = 5GiB = 5 * 1024 * 1024 * 1024)
ResourceStorage ResourceName = "storage"
// Local ephemeral storage, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
// The resource name for ResourceEphemeralStorage is alpha and it can change across releases.
ResourceEphemeralStorage ResourceName = "ephemeral-storage"
)
// ResourceList is a set of (resource name, quantity) pairs.
type ResourceList map[ResourceName]resource.Quantity
- 资源例子
cpu=200m,memory=150G,pid=100
- 回到 createNodeAllocatableCgroups ,下面就是用节点资源总量减去 kubelet预留的
if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) {
nodeAllocatable = cm.getNodeAllocatableInternalAbsolute()
}
func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList {
return cm.getNodeAllocatableAbsoluteImpl(cm.capacity)
}
func (cm *containerManagerImpl) getNodeAllocatableAbsoluteImpl(capacity v1.ResourceList) v1.ResourceList {
result := make(v1.ResourceList)
for k, v := range capacity {
value := v.DeepCopy()
if cm.NodeConfig.SystemReserved != nil {
value.Sub(cm.NodeConfig.SystemReserved[k])
}
if cm.NodeConfig.KubeReserved != nil {
value.Sub(cm.NodeConfig.KubeReserved[k])
}
if value.Sign() < 0 {
// Negative Allocatable resources don't make sense.
value.Set(0)
}
result[k] = value
}
return result
}
- 从上面可以看出就是用遍历总量 capacity中的kv,然后减去cm.NodeConfig.SystemReserved代表系统预留的,减去cm.NodeConfig.KubeReserved 代表kubelet预留的数量
- value.Sub就代表减去,意思是分配给pod用的是总量减去机器上基础服务预留的
- cm.NodeConfig.SystemReserved系统预留的资源追踪 ,对应的命令行参数为–system-reserved,默认为none,比如
--system-reserved=cpu=200m,memory=500Mi,ephemeral-storage=1Gi
- 为kubelet等k8s组件预留的对应的命令行参数为 --kube-reserved=,默认为none,比如
--kube-reserved=cpu=200m,memory=500Mi,ephemeral-storage=1Gi
-
为系统预留资源 文档地址
-
用减去预留资源后的cgroupConfig去cgroupManager查找,如果name不存在就创建
//createNodeAllocatableCgroups creates Node Allocatable Cgroup when CgroupsPerQOS flag is specified as true
func (cm *containerManagerImpl) createNodeAllocatableCgroups() error {
cgroupConfig := &CgroupConfig{
Name: cm.cgroupRoot,
// The default limits for cpu shares can be very low which can lead to CPU starvation for pods.
ResourceParameters: getCgroupConfig(nodeAllocatable),
}
if cm.cgroupManager.Exists(cgroupConfig.Name) {
return nil
}
if err := cm.cgroupManager.Create(cgroupConfig); err != nil {
klog.ErrorS(err, "Failed to create cgroup", "cgroupName", cm.cgroupRoot)
return err
}
return nil
}
Create创建解析
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\cgroup_manager_linux.go
- 将传入的参数转为resources
func (m *cgroupManagerImpl) Create(cgroupConfig *CgroupConfig) error {
start := time.Now()
defer func() {
metrics.CgroupManagerDuration.WithLabelValues("create").Observe(metrics.SinceInSeconds(start))
}()
resources := m.toResources(cgroupConfig.ResourceParameters)
libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{
Resources: resources,
}
- 如果是systemd驱动,就适配一下路径
// libcontainer consumes a different field and expects a different syntax
// depending on the cgroup driver in use, so we need this conditional here.
if m.adapter.cgroupManagerType == libcontainerSystemd {
updateSystemdCgroupInfo(libcontainerCgroupConfig, cgroupConfig.Name)
} else {
libcontainerCgroupConfig.Path = cgroupConfig.Name.ToCgroupfs()
}
- 调用底层manager创建
// get the manager with the specified cgroup configuration
manager, err := m.adapter.newManager(libcontainerCgroupConfig, nil)
if err != nil {
return err
}
- 然后调用新建的manager的Apply方法应用,传入-1代表创建subsystem顶级目录
if err := manager.Apply(-1); err != nil {
return err
}
- 我们这里使用的systemd驱动,并且是cgroup v1,所以对应的Apply为 systemd-v1,对应就是相关的适配工作
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\vendor\github.com\opencontainers\runc\libcontainer\cgroups\systemd\v1.go
func (m *legacyManager) Apply(pid int) error {
var (
c = m.cgroups
unitName = getUnitName(c)
slice = "system.slice"
properties []systemdDbus.Property
)
if c.Resources.Unified != nil {
return cgroups.ErrV1NoUnified
}
m.mu.Lock()
defer m.mu.Unlock()
if c.Paths != nil {
paths := make(map[string]string)
cgMap, err := cgroups.ParseCgroupFile("/proc/self/cgroup")
if err != nil {
return err
}
// XXX(kolyshkin@): why this check is needed?
for name, path := range c.Paths {
if _, ok := cgMap[name]; ok {
paths[name] = path
}
}
m.paths = paths
return cgroups.EnterPid(m.paths, pid)
}
if c.Parent != "" {
slice = c.Parent
}
properties = append(properties, systemdDbus.PropDescription("libcontainer container "+c.Name))
// if we create a slice, the parent is defined via a Wants=
if strings.HasSuffix(unitName, ".slice") {
properties = append(properties, systemdDbus.PropWants(slice))
} else {
// otherwise, we use Slice=
properties = append(properties, systemdDbus.PropSlice(slice))
}
// only add pid if its valid, -1 is used w/ general slice creation.
if pid != -1 {
properties = append(properties, newProp("PIDs", []uint32{uint32(pid)}))
}
// Check if we can delegate. This is only supported on systemd versions 218 and above.
if !strings.HasSuffix(unitName, ".slice") {
// Assume scopes always support delegation.
properties = append(properties, newProp("Delegate", true))
}
// Always enable accounting, this gets us the same behaviour as the fs implementation,
// plus the kernel has some problems with joining the memory cgroup at a later time.
properties = append(properties,
newProp("MemoryAccounting", true),
newProp("CPUAccounting", true),
newProp("BlockIOAccounting", true),
newProp("TasksAccounting", true),
)
// Assume DefaultDependencies= will always work (the check for it was previously broken.)
properties = append(properties,
newProp("DefaultDependencies", false))
properties = append(properties, c.SystemdProps...)
if err := startUnit(m.dbus, unitName, properties); err != nil {
return err
}
paths := make(map[string]string)
for _, s := range legacySubsystems {
subsystemPath, err := getSubsystemPath(m.cgroups, s.Name())
if err != nil {
// Even if it's `not found` error, we'll return err
// because devices cgroup is hard requirement for
// container security.
if s.Name() == "devices" {
return err
}
// Don't fail if a cgroup hierarchy was not found, just skip this subsystem
if cgroups.IsNotFound(err) {
continue
}
return err
}
paths[s.Name()] = subsystemPath
}
m.paths = paths
if err := m.joinCgroups(pid); err != nil {
return err
}
return nil
}
最终创建的qos 顶级目录为
- /sys/fs/cgroup/systemd/kubepods.slice
/sys/fs/cgroup/systemd/kubepods.slice
[root@k8s-node01 kubepods.slice]# ll
total 0
-rw-r--r-- 1 root root 0 Aug 20 11:05 cgroup.clone_children
--w--w--w- 1 root root 0 Aug 20 11:05 cgroup.event_control
-rw-r--r-- 1 root root 0 Aug 20 11:05 cgroup.procs
drwxr-xr-x 21 root root 0 Sep 10 16:34 kubepods-besteffort.slice
drwxr-xr-x 21 root root 0 Sep 6 11:51 kubepods-burstable.slice
drwxr-xr-x 4 root root 0 Sep 27 11:38 kubepods-pod4115089e_088b_4fb1_8884_b3f163efa48c.slice
-rw-r--r-- 1 root root 0 Aug 20 11:05 notify_on_release
-rw-r--r-- 1 root root 0 Aug 20 11:05 tasks
- 同时还会创建 kubepods-besteffort.slice代表 besteffort类型的pod
- kubepods-burstable.slice代表 burstable类型的
本节重点总结 :
- CgroupManager负责cgroup相关操作,支持cgroup的创建、删除和更新
- CgroupManager应用之一就是创建节点的顶级qos cgroup目录
- driver类型有 cgroupsfs 和systemd 两者都是对cgroup的封装
- systemd 提供了 cgroups 的使用和管理接口