当前位置: 首页 > article >正文

Flink operator实现自动扩缩容

官网文档位置:

1.Autoscaler | Apache Flink Kubernetes Operator

2.Configuration | Apache Flink Kubernetes Operator

1.部署K8S集群

可参照我之前的文章k8s集群搭建

2.Helm安装Flink-Operator

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/

helm repo update

--如果没有这个命名空间就创建
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--namespace=flink-operator \
--create-namespace \
--set webhook.create=false \
--version 1.10.0

3.安装prometheus

operator通过监控prometheus实现自动扩缩容,过两天调整为helm

可以采用helm安装也可采用yaml,由于helm没安装成功我就采用yaml安装了

# prometheus-basic.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: monitoring
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  namespace: monitoring
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    scrape_configs:
      - job_name: 'flink'
        static_configs:
          - targets: ['flink-metrics.flink-apps.svc.cluster.local:9249']
        metrics_path: /metrics
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prometheus
  namespace: monitoring
spec:
  selector:
    matchLabels:
      app: prometheus
  replicas: 1
  template:
    metadata:
      labels:
        app: prometheus
    spec:
      containers:
      - name: prometheus
        image: prom/prometheus:v2.30.3
        args:
        - "--config.file=/etc/prometheus/prometheus.yml"
        - "--storage.tsdb.path=/prometheus"
        - "--web.enable-lifecycle"
        ports:
        - containerPort: 9090
        volumeMounts:
        - name: config-volume
          mountPath: /etc/prometheus/
        - name: storage-volume
          mountPath: /prometheus
      volumes:
      - name: config-volume
        configMap:
          name: prometheus-config
      - name: storage-volume
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: prometheus
  namespace: monitoring
spec:
  type: NodePort
  ports:
  - port: 9090
    targetPort: 9090
    nodePort: 30090
  selector:
    app: prometheus

4.制作镜像包

Dockerfile内容,flink-test-1.0-SNAPSHOT.jar为测试代码

ARG FLINK_VERSION=1.18.1
FROM flink:${FLINK_VERSION}-scala_2.12
RUN mkdir -p /opt/flink/usrlib
COPY flink-test-1.0-SNAPSHOT.jar /opt/flink/usrlib/
COPY flink-metrics-prometheus-1.18.1.jar  /opt/flink/lib/
COPY flink-statebackend-rocksdb-1.18.1.jar  /opt/flink/lib/
COPY flink-connector-files-1.18.1.jar  /opt/flink/lib/
WORKDIR /opt/flink




# 1. 构建 Docker 镜像
# -t: 指定镜像名称和标签
# .: 使用当前目录的 Dockerfile
# --no-cache: 不使用缓存,从头构建
docker build -t zht-flink:1.18.1 . --no-cache

# 2. 为本地镜像添加远程仓库标签
# 格式: registry地址/命名空间/镜像名:标签
docker tag zht-flink:1.18.1 registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1

# 3. 推送镜像到阿里云镜像仓库
# 将标记的镜像推送到远程仓库
docker push registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1

5.创建命名空间和serviceaccount等

kubectl create namespace  flink-apps

kubectl -n flink-apps create serviceaccount flink-serviceaccount

kubectl -n flink-apps create clusterrolebinding flink-role-binding --clusterrole=edit --serviceaccount=flink-apps:flink-serviceaccount


kubectl create secret docker-registry flink-apps-secret \
--docker-server=registry.cn-hangzhou.aliyuncs.com \
--docker-username=xx \
--docker-password=xxxx \
-n flink-apps

kubectl patch serviceaccount flink-serviceaccount -p '{"imagePullSecrets": [{"name": "flink-apps-secret"}]}' -n  flink-apps

6.任务和扩缩容配置

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-autoscaling-sum-job
  namespace: flink-apps
spec:
  image: registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1
  flinkVersion: v1_18
  mode: native

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    parallelism.default: "2"
    state.backend: rocksdb
    state.checkpoints.dir: file:///flink-data/checkpoints
    state.savepoints.dir: file:///flink-data/savepoints
    metrics.reporters: prometheus
    metrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prometheus.port: "9249"
    execution.checkpointing.interval: "10000"
    execution.checkpointing.mode: "EXACTLY_ONCE"
    execution.checkpointing.timeout: "600000"
    execution.checkpointing.min.pause: "10000"
    execution.checkpointing.max.concurrent.checkpoints: "1"
    # 启用 Source 指标收集
    metrics.source.enable: "true"
    metrics.source.records.in.enable: "true"
    metrics.source.records.out.enable: "true"
    metrics.source.records.lag.enable: "true"
    # 启用所有算子指标
    metrics.operator.enable: "true"
    metrics.operator.records.in.enable: "true"
    metrics.operator.records.out.enable: "true"
    # 启用任务指标
    metrics.task.enable: "true"
    metrics.task.records.in.enable: "true"
    metrics.task.records.out.enable: "true"
    # 设置指标收集间隔
    metrics.fetcher.update-interval: "1000"
    metrics.latency.interval: "1000"
    # 启用 IO 指标
    metrics.io.enable: "true" 
    jobmanager.scheduler: "adaptive"
    # 自动扩缩容配置
    job.autoscaler.enabled: "true"
    job.autoscaler.metrics.window: "20s"
    job.autoscaler.target.utilization: "0.30"
    job.autoscaler.scale.up.threshold: "0.05"
    job.autoscaler.scale.down.threshold: "0.1"
    job.autoscaler.metrics.memory.average: "1.0"
    job.autoscaler.metrics.memory.window: "5s"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.cooldown.period: "5s"
    job.autoscaler.scale.up.max.factor: "1.5"
    job.autoscaler.scale.down.max.factor: "0.5"    
    # 指标相关配置
    job.autoscaler.backpressure.enabled: "true"
    metrics.latency.granularity: "operator"
    web.backpressure.refresh-interval: "1000"
    metrics.backpressure.enabled: "true"
    metrics.backpressure.interval: "1000"
    metrics.backpressure.timeout: "60000"
    # 修改 job status metrics 配置
    metrics.job.status.enable: "STATE"
    # 新增 CPU 指标配置
    metrics.system.cpu: "true"
    metrics.system.cpu.load: "true"
    metrics.system.resource: "true"
    
  serviceAccount: flink-serviceaccount

  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
    replicas: 1

  taskManager:
    resource:
      memory: "1024m"
      cpu: 1

  job:
    jarURI: local:///opt/flink/usrlib/flink-test-1.0-SNAPSHOT.jar
    entryClass: com.zht.sumJob
    args: []
    parallelism: 1
    upgradeMode: stateless

  podTemplate:
    spec:
      volumes:
        - name: checkpoint-data
          hostPath:
            path: /data/flink-checkpoints
            type: DirectoryOrCreate
      containers:
        - name: flink-main-container
          volumeMounts:
            - name: checkpoint-data
              mountPath: /flink-data
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9249"

---
apiVersion: batch/v1
kind: Job
metadata:
  name: init-checkpoint-dir
  namespace: flink-apps
spec:
  template:
    spec:
      serviceAccountName: flink-serviceaccount
      containers:
      - name: init-dir
        image: busybox
        command: ["/bin/sh", "-c"]
        args:
          - |
            mkdir -p /data/flink-checkpoints/checkpoints
            mkdir -p /data/flink-checkpoints/savepoints
            chmod -R 777 /data/flink-checkpoints
        volumeMounts:
          - name: checkpoint-data
            mountPath: /data/flink-checkpoints
        resources:
          limits:
            cpu: "0.1"
            memory: "64Mi"
          requests:
            cpu: "0.1"
            memory: "64Mi"
      volumes:
        - name: checkpoint-data
          hostPath:
            path: /data/flink-checkpoints
            type: DirectoryOrCreate
      restartPolicy: Never
  backoffLimit: 4

---
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-ui
  namespace: flink-apps
spec:
  type: NodePort
  ports:
    - name: webui
      port: 8081
      targetPort: 8081
      nodePort: 30081
  selector:
    component: jobmanager
    app: flink-autoscaling-sum-job

---
apiVersion: v1
kind: Service
metadata:
  name: flink-metrics
  namespace: flink-apps
spec:
  type: NodePort
  ports:
    - name: metrics
      port: 9249
      targetPort: 9249
      nodePort: 30249
  selector:
    component: taskmanager
    app: flink-autoscaling-sum-job
注意点:

1.添加 flink-metrics-prometheus-1.18.1.jar 不然启动不了metrics
2.注意先排查metrics是否启用成功。curl http://localhost:9249/metrics查看是否有值
3.之后查看prometheus页面的target是否有flink metrics
4.yaml或者flink任务配置好启用监控的配置


http://www.kler.cn/a/463271.html

相关文章:

  • 在Ubuntu 18.04.6 LTS安装OpenFace流程
  • 【保姆级】sql注入之堆叠注入
  • kafka开机自启失败问题处理
  • 【pytorch】现代循环神经网络-2
  • 网关的主要作用
  • 【Multisim用74ls92和90做六十进制】2022-6-12
  • GAN对抗生成网络(一)——基本原理及数学推导
  • 结合长短期记忆网络(LSTM)和无迹卡尔曼滤波器(UKF)的技术在机器人导航和状态估计中的应用前景
  • 力扣-数据结构-9【算法学习day.80】
  • 30分钟搭建 Typecho 个人博客教程
  • 使用exe4j将jar转成exe、java打包exe
  • vue v-for 数据增加页面不刷新
  • EasyExcel自定义动态下拉框(附加业务对象转换功能)
  • chatglm3如何进行微调
  • 在pytest钩子函数中判断Android和iOS设备(方法二)
  • libmodbus主机通信主要函数分析
  • 2021年国家公考《申论》题(地市级)
  • [工业 4.0] 机器学习如何推动智能制造升级
  • 【从零开始入门unity游戏开发之——C#篇40】C#特性(Attributes)和自定义特性
  • HarmonyOS Next ArkUI ListListItem笔记
  • 【SQL server】教材数据库(5)
  • github
  • 在 Alpine Linux 下通过 Docker 部署 Nginx 服务器
  • 【Pytorch实用教程】深入了解 torchvision.models.resnet18 新旧版本的区别
  • 智能边缘计算×软硬件一体化:开启全场景效能革命新征程(独立开发者作品)
  • 【置顶】测试学习笔记整理