【 Kubernetes 风云录 】- MutatingWebhook 实现自动注入
文章目录
- 启发
- 流程
- 任务
- webhook服务
- MutatingWebhookConfiguration
- 验证
⚡️目的为了将链路上的所有服务在不调整控制器的前提下,为其运行的 Pod 自动添加特定的 label,目前考虑的方案是通过
MutatingAdmissionWebhook
实现,在 Pod 创建时动态注入指定的 label,从而实现统一标识和管理的目的。
启发
⏰初期的技术构思来源于 Istio 的自动 Sidecar 注入机制。Istio 通过在命名空间上添加 istio-injection=enabled
的 label,使 Istio 能在 Pod 创建时自动注入 Sidecar。这种方式注入机制灵活、解耦,并且无需更改控制器。
流程
✅ MutatingWebhookConfiguration
是 Kubernetes 中的一个 准入控制机制配置资源,主要作用是:
在资源(如 Pod)被创建或更新时,拦截请求并对其“修改”(Mutate)内容。
📌 简单理解可以把它理解为兄弟部门有一个需求过来,得先让老板先确认一下,看看是否要做什么调整,然后再去处理
任务
我想让 MutatingWebhook
帮忙做的事情很简单,当控制器有对应某个label,这样 WasmPlugin
就能自定义作用到哪些链路哪些实例上。
webhook服务
⚡️: CA 证书自己去创建。
⚡️: 服务部署到集群上。
❌: 注意使用的client-go版本和实际集群版本,集群旧版本使用最新的client-go资源类型可能存在问题。
package main
import (
"context"
"encoding/json"
"log"
"net/http"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value"`
}
func mutatePod(w http.ResponseWriter, r *http.Request) {
log.Println("➡️ Received mutation request")
var admissionReview admissionv1.AdmissionReview
if err := json.NewDecoder(r.Body).Decode(&admissionReview); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
uid := admissionReview.Request.UID
log.Printf("✅ AdmissionReview UID: %s", uid)
var pod corev1.Pod
if err := json.Unmarshal(admissionReview.Request.Object.Raw, &pod); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
log.Printf("📝 Pod Name: %s | Namespace: %s", pod.Name, pod.Namespace)
var patches []patchOperation
if len(pod.OwnerReferences) > 0 {
owner := pod.OwnerReferences[0]
log.Printf("🔗 Pod controlled by: Kind=%s, Name=%s", owner.Kind, owner.Name)
config, err := rest.InClusterConfig()
if err != nil {
log.Printf("❌ Error building in-cluster config: %v", err)
http.Error(w, "Internal config error", http.StatusInternalServerError)
return
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Printf("❌ Error creating clientset: %v", err)
http.Error(w, "Internal clientset error", http.StatusInternalServerError)
return
}
controllerNamespace := pod.Namespace
var controllerLabels map[string]string
switch owner.Kind {
case "StatefulSet":
sts, err := clientset.AppsV1().StatefulSets(controllerNamespace).Get(context.TODO(), owner.Name, metav1.GetOptions{})
if err != nil {
log.Printf("❌ Failed to get StatefulSet: %v", err)
} else {
controllerLabels = sts.Labels
}
case "ReplicaSet":
rs, err := clientset.AppsV1().ReplicaSets(controllerNamespace).Get(context.TODO(), owner.Name, metav1.GetOptions{})
if err != nil {
log.Printf("❌ Failed to get ReplicaSet: %v", err)
} else {
controllerLabels = rs.Labels
// 🔍 尝试进一步获取 Deployment
if len(rs.OwnerReferences) > 0 && rs.OwnerReferences[0].Kind == "Deployment" {
deployName := rs.OwnerReferences[0].Name
log.Printf("🔍 Found Deployment owner: %s", deployName)
deploy, err := clientset.AppsV1().Deployments(controllerNamespace).Get(context.TODO(), deployName, metav1.GetOptions{})
if err != nil {
log.Printf("❌ Failed to get Deployment: %v", err)
} else {
controllerLabels = deploy.Labels
}
}
}
default:
log.Printf("ℹ️ Unsupported controller kind: %s", owner.Kind)
}
// 注入 controller 上的 proxy label
if val, exists := controllerLabels["proxy"]; exists {
log.Printf("✅ Injecting label proxy=%s into Pod", val)
patch := patchOperation{
Op: "add",
Path: "/metadata/labels/proxy",
Value: val,
}
patches = append(patches, patch)
} else {
log.Printf("ℹ️ Controller has no 'proxy' label, skipping injection")
}
} else {
log.Println("ℹ️ No ownerReferences found on Pod")
}
var admissionResponse admissionv1.AdmissionResponse
admissionResponse.Allowed = true
if len(patches) > 0 {
patchBytes, _ := json.Marshal(patches)
pt := admissionv1.PatchTypeJSONPatch
admissionResponse.Patch = patchBytes
admissionResponse.PatchType = &pt
} else {
log.Println("✅ No patch needed, allowing request")
}
admissionResponse.UID = uid
admissionReview.Response = &admissionResponse
if err := json.NewEncoder(w).Encode(admissionReview); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Println("🚀 Mutation response sent successfully")
}
func main() {
http.HandleFunc("/mutate", mutatePod)
server := &http.Server{
Addr: ":8080",
}
log.Println("🚀 Starting webhook server on port 8080...")
if err := server.ListenAndServeTLS("/root/ycloud/MutatingWebhook/tls.crt", "/root/ycloud/MutatingWebhook/tls.key"); err != nil {
log.Fatalf("❌ Failed to start webhook server: %v", err)
}
}
MutatingWebhookConfiguration
当前的配置 仅在 Pod 创建(CREATE)阶段 触发。并且作用域在 devel
这个namespace
下
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: proxy-label-injector
webhooks:
- name: proxy-label-injector.example.com
clientConfig:
service:
name: label-injector-webhook
namespace: default
path: /mutate
port: 8080
caBundle: <CA-Bundle> #这里需要替换为你的 CA 证书内容 cat ca.crt | base64 | tr -d '\n'
rules:
- operations: ["CREATE"]
apiGroups: [""]
apiVersions: ["v1"]
resources: ["pods"]
admissionReviewVersions: ["v1"]
sideEffects: None
timeoutSeconds: 5
namespaceSelector:
matchExpressions:
- key: kubernetes.io/metadata.name
operator: In
values:
- "devel"
验证
在控制器中加入
proxy: wasm
这个 Label 后,删除对应的 Pod,让控制器自动重新创建。由于控制器创建的新 Pod 会触发MutatingAdmissionWebhook
,此时Webhook
会根据所属控制器的 Label 自动为 Pod 注入proxy: wasm
标签。可以通过查看Webhook
日志确认注入是否生效,同时也可以通过kubectl describe pod
或kubectl get pod --show-labels
等命令验证 Pod 是否已正确携带该标签。