k8s-golang获取健康状态ip
测试
package k8s
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestGetServicePods(t *testing.T) {
agent, err := NewK8sAgent("./config")
require.NoError(t, err)
require.NotEmpty(t, agent)
data, err := agent.GetServicePods("sit-xchat", "xchat-controller-go-sit")
require.NoError(t, err)
require.NotEmpty(t, data)
}
func TestRunningIPList(t *testing.T) {
agent, err := NewK8sAgent("./config")
require.NoError(t, err)
require.NotEmpty(t, agent)
data, err := agent.GetRunningIP("uat-xchat", "triton-ensemble-dssm-uat-lb")
for _, item := range data {
fmt.Printf("item:%v\n", item)
}
require.NoError(t, err)
require.NotEmpty(t, data)
}
k8s访问
package k8s
import (
"context"
"fmt"
"logger"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type K8sAgent struct {
k8sClientset *kubernetes.Clientset
}
func NewK8sAgent(kubeconfig string) (*K8sAgent, error) {
k8sAgent := K8sAgent{}
if err := k8sAgent.init(kubeconfig); err != nil {
return nil, err
}
return &k8sAgent, nil
}
func (agent *K8sAgent) init(kubeconfig string) error {
var config *rest.Config
var err error
logger.Info(fmt.Sprintf("kubeconfig: %s", kubeconfig))
if kubeconfig == "" {
config, err = rest.InClusterConfig()
if err != nil {
logger.Errorf("rest.InClusterConfig error: %s", err)
return err
}
} else {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
logger.Errorf("clientcmd.BuildConfigFromFlags error: %s", err)
return err
}
}
agent.k8sClientset, err = kubernetes.NewForConfig(config)
return err
}
func (agent *K8sAgent) UpdateDeploymentReplicas(ns, deploymentName string, replicas int32) error {
deployment, err := agent.k8sClientset.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metaV1.GetOptions{})
if err != nil {
return err
}
oldReplicas := deployment.Spec.Replicas
deployment.Spec.Replicas = &replicas
if _, err = agent.k8sClientset.AppsV1().Deployments(ns).Update(context.TODO(), deployment, metaV1.UpdateOptions{}); err != nil {
logger.Errorf("k8s agent update deployment(%s/%s) error: %s", ns, deploymentName, err)
return err
}
logger.Infof("k8s agent update deployment(%s, %s) replicas(%d -> %d)", ns, deploymentName, *oldReplicas, replicas)
return nil
}
func (agent *K8sAgent) GetDeploymentReplicas(ns, deploymentName string) (int32, int32, error) {
deployment, err := agent.k8sClientset.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metaV1.GetOptions{})
if err != nil {
logger.Errorf("k8s agent get deployment(%s/%s) error: %s", ns, deploymentName, err)
return -1, -1, err
}
ready, total := deployment.Status.ReadyReplicas, deployment.Status.Replicas
return ready, total, nil
}
func (agent *K8sAgent) GetDeploymentPods(namespace string, deploymentName string) ([]coreV1.Pod, error) {
deployment, err := agent.k8sClientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metaV1.GetOptions{})
if err != nil {
logger.Errorf("k8s agent get deployment(%s) error: %s", deploymentName, err)
return nil, err
}
selector := deployment.Spec.Selector
labelSelector := metaV1.FormatLabelSelector(selector)
podList, err := agent.k8sClientset.CoreV1().Pods(namespace).List(context.TODO(), metaV1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
logger.Errorf("k8s agent get pods(deployment labelselector: %s) error: %s", labelSelector, err)
return nil, err
}
logger.Infof("k8s agent GetDeploymentPods(%s/%s), pods: %d", namespace, deploymentName, len(podList.Items))
return podList.Items, nil
}
func (agent *K8sAgent) GetServicePods(namespace string, serviceName string) ([]coreV1.Pod, error) {
service, err := agent.k8sClientset.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metaV1.GetOptions{})
if err != nil {
logger.Errorf("k8s agent get service(%s) error: %s", serviceName, err)
return nil, err
}
selector := &metaV1.LabelSelector{
MatchLabels: service.Spec.Selector,
}
labelSelector := metaV1.FormatLabelSelector(selector)
podList, err := agent.k8sClientset.CoreV1().Pods(namespace).List(context.TODO(), metaV1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
logger.Errorf("k8s agent get pods(service labelselector: %s) error: %s", labelSelector, err)
return nil, err
}
return podList.Items, nil
}
// GetRunningIP 获取service下所有running状态的pod的ip
func (agent *K8sAgent) GetRunningIP(namespace string, serviceName string) ([]string, error) {
podList, err := agent.GetServicePods(namespace, serviceName)
if err != nil {
return nil, err
}
runningIP := []string{}
for _, pod := range podList {
// 在运行中且未触发删除
if pod.Status.Phase == coreV1.PodRunning && pod.DeletionTimestamp == nil {
ready := true
// 所有状态都初始化完成
for _, condition := range pod.Status.Conditions {
if condition.Status != coreV1.ConditionTrue {
ready = false
break
}
}
if ready {
runningIP = append(runningIP, pod.Status.PodIP)
}
}
}
return runningIP, nil
}
func (agent *K8sAgent) GetK8sClient() kubernetes.Clientset {
return *agent.k8sClientset
}