Flink operator实现自动扩缩容
官网文档位置:
1.Autoscaler | Apache Flink Kubernetes Operator
2.Configuration | Apache Flink Kubernetes Operator
1.部署K8S集群
可参照我之前的文章k8s集群搭建
2.Helm安装Flink-Operator
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
helm repo update
--如果没有这个命名空间就创建
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--namespace=flink-operator \
--create-namespace \
--set webhook.create=false \
--version 1.10.0
3.安装prometheus
operator通过监控prometheus实现自动扩缩容,过两天调整为helm
可以采用helm安装也可采用yaml,由于helm没安装成功我就采用yaml安装了
# prometheus-basic.yaml
apiVersion: v1
kind: Namespace
metadata:
name: monitoring
---
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: monitoring
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'flink'
static_configs:
- targets: ['flink-metrics.flink-apps.svc.cluster.local:9249']
metrics_path: /metrics
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus
namespace: monitoring
spec:
selector:
matchLabels:
app: prometheus
replicas: 1
template:
metadata:
labels:
app: prometheus
spec:
containers:
- name: prometheus
image: prom/prometheus:v2.30.3
args:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.path=/prometheus"
- "--web.enable-lifecycle"
ports:
- containerPort: 9090
volumeMounts:
- name: config-volume
mountPath: /etc/prometheus/
- name: storage-volume
mountPath: /prometheus
volumes:
- name: config-volume
configMap:
name: prometheus-config
- name: storage-volume
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: prometheus
namespace: monitoring
spec:
type: NodePort
ports:
- port: 9090
targetPort: 9090
nodePort: 30090
selector:
app: prometheus
4.制作镜像包
Dockerfile内容,flink-test-1.0-SNAPSHOT.jar为测试代码
ARG FLINK_VERSION=1.18.1
FROM flink:${FLINK_VERSION}-scala_2.12
RUN mkdir -p /opt/flink/usrlib
COPY flink-test-1.0-SNAPSHOT.jar /opt/flink/usrlib/
COPY flink-metrics-prometheus-1.18.1.jar /opt/flink/lib/
COPY flink-statebackend-rocksdb-1.18.1.jar /opt/flink/lib/
COPY flink-connector-files-1.18.1.jar /opt/flink/lib/
WORKDIR /opt/flink
# 1. 构建 Docker 镜像
# -t: 指定镜像名称和标签
# .: 使用当前目录的 Dockerfile
# --no-cache: 不使用缓存,从头构建
docker build -t zht-flink:1.18.1 . --no-cache
# 2. 为本地镜像添加远程仓库标签
# 格式: registry地址/命名空间/镜像名:标签
docker tag zht-flink:1.18.1 registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1
# 3. 推送镜像到阿里云镜像仓库
# 将标记的镜像推送到远程仓库
docker push registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1
5.创建命名空间和serviceaccount等
kubectl create namespace flink-apps
kubectl -n flink-apps create serviceaccount flink-serviceaccount
kubectl -n flink-apps create clusterrolebinding flink-role-binding --clusterrole=edit --serviceaccount=flink-apps:flink-serviceaccount
kubectl create secret docker-registry flink-apps-secret \
--docker-server=registry.cn-hangzhou.aliyuncs.com \
--docker-username=xx \
--docker-password=xxxx \
-n flink-apps
kubectl patch serviceaccount flink-serviceaccount -p '{"imagePullSecrets": [{"name": "flink-apps-secret"}]}' -n flink-apps
6.任务和扩缩容配置
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-autoscaling-sum-job
namespace: flink-apps
spec:
image: registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1
flinkVersion: v1_18
mode: native
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
parallelism.default: "2"
state.backend: rocksdb
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
metrics.reporters: prometheus
metrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prometheus.port: "9249"
execution.checkpointing.interval: "10000"
execution.checkpointing.mode: "EXACTLY_ONCE"
execution.checkpointing.timeout: "600000"
execution.checkpointing.min.pause: "10000"
execution.checkpointing.max.concurrent.checkpoints: "1"
# 启用 Source 指标收集
metrics.source.enable: "true"
metrics.source.records.in.enable: "true"
metrics.source.records.out.enable: "true"
metrics.source.records.lag.enable: "true"
# 启用所有算子指标
metrics.operator.enable: "true"
metrics.operator.records.in.enable: "true"
metrics.operator.records.out.enable: "true"
# 启用任务指标
metrics.task.enable: "true"
metrics.task.records.in.enable: "true"
metrics.task.records.out.enable: "true"
# 设置指标收集间隔
metrics.fetcher.update-interval: "1000"
metrics.latency.interval: "1000"
# 启用 IO 指标
metrics.io.enable: "true"
jobmanager.scheduler: "adaptive"
# 自动扩缩容配置
job.autoscaler.enabled: "true"
job.autoscaler.metrics.window: "20s"
job.autoscaler.target.utilization: "0.30"
job.autoscaler.scale.up.threshold: "0.05"
job.autoscaler.scale.down.threshold: "0.1"
job.autoscaler.metrics.memory.average: "1.0"
job.autoscaler.metrics.memory.window: "5s"
job.autoscaler.stabilization.interval: "5s"
job.autoscaler.cooldown.period: "5s"
job.autoscaler.scale.up.max.factor: "1.5"
job.autoscaler.scale.down.max.factor: "0.5"
# 指标相关配置
job.autoscaler.backpressure.enabled: "true"
metrics.latency.granularity: "operator"
web.backpressure.refresh-interval: "1000"
metrics.backpressure.enabled: "true"
metrics.backpressure.interval: "1000"
metrics.backpressure.timeout: "60000"
# 修改 job status metrics 配置
metrics.job.status.enable: "STATE"
# 新增 CPU 指标配置
metrics.system.cpu: "true"
metrics.system.cpu.load: "true"
metrics.system.resource: "true"
serviceAccount: flink-serviceaccount
jobManager:
resource:
memory: "1024m"
cpu: 1
replicas: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
job:
jarURI: local:///opt/flink/usrlib/flink-test-1.0-SNAPSHOT.jar
entryClass: com.zht.sumJob
args: []
parallelism: 1
upgradeMode: stateless
podTemplate:
spec:
volumes:
- name: checkpoint-data
hostPath:
path: /data/flink-checkpoints
type: DirectoryOrCreate
containers:
- name: flink-main-container
volumeMounts:
- name: checkpoint-data
mountPath: /flink-data
metadata:
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9249"
---
apiVersion: batch/v1
kind: Job
metadata:
name: init-checkpoint-dir
namespace: flink-apps
spec:
template:
spec:
serviceAccountName: flink-serviceaccount
containers:
- name: init-dir
image: busybox
command: ["/bin/sh", "-c"]
args:
- |
mkdir -p /data/flink-checkpoints/checkpoints
mkdir -p /data/flink-checkpoints/savepoints
chmod -R 777 /data/flink-checkpoints
volumeMounts:
- name: checkpoint-data
mountPath: /data/flink-checkpoints
resources:
limits:
cpu: "0.1"
memory: "64Mi"
requests:
cpu: "0.1"
memory: "64Mi"
volumes:
- name: checkpoint-data
hostPath:
path: /data/flink-checkpoints
type: DirectoryOrCreate
restartPolicy: Never
backoffLimit: 4
---
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-ui
namespace: flink-apps
spec:
type: NodePort
ports:
- name: webui
port: 8081
targetPort: 8081
nodePort: 30081
selector:
component: jobmanager
app: flink-autoscaling-sum-job
---
apiVersion: v1
kind: Service
metadata:
name: flink-metrics
namespace: flink-apps
spec:
type: NodePort
ports:
- name: metrics
port: 9249
targetPort: 9249
nodePort: 30249
selector:
component: taskmanager
app: flink-autoscaling-sum-job
注意点:
1.添加 flink-metrics-prometheus-1.18.1.jar 不然启动不了metrics
2.注意先排查metrics是否启用成功。curl http://localhost:9249/metrics查看是否有值
3.之后查看prometheus页面的target是否有flink metrics
4.yaml或者flink任务配置好启用监控的配置