使用Dinky快速提交Flink operator任务
官网地址:K8s集成 | Dinky
1.目前使用版本
Dinky1.2.0、Flink1.18.1、Flink operator0.10.0
2.制作镜像
2.1创建DockerFile
ARG FLINK_VERSION=1.18.1
FROM flink:${FLINK_VERSION}-scala_2.12
RUN mkdir -p /opt/flink/usrlib
COPY commons-cli-1.3.1.jar /opt/flink/lib/
COPY dinky-app-1.18-1.2.0-jar-with-dependencies.jar /opt/flink/usrlib/
COPY flink-metrics-prometheus-1.18.1.jar /opt/flink/lib/
COPY flink-table-planner_2.12-1.18.1.jar /opt/flink/lib/
COPY mysql-connector-java-8.0.30.jar /opt/flink/lib/
COPY flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar /opt/flink/lib/
COPY commons-math3-3.6.1.jar /opt/flink/lib/
RUN rm -rf ${FLINK_HOME}/lib/flink-table-planner-loader-*.jar
2.2 构建镜像并推送到私有镜像仓库
docker build -t dinky-flink:1.18.1 . --no-cache
docker tag dinky-flink:1.18.1 registry.cn-hangzhou.aliyuncs.com/dinkyhub/dinky-flink:1.18.1
docker push registry.cn-hangzhou.aliyuncs.com/dinkyhub/dinky-flink:1.18.1
2.3创建serviceaccount等
kubectl create namespace flink-apps
kubectl -n flink-apps create serviceaccount flink-serviceaccount
kubectl -n flink-apps create clusterrolebinding flink-role-binding --clusterrole=cluster-admin --serviceaccount=flink-apps:flink-serviceaccount
--这里注意--clusterrole=cluster-admin 权限级别较高 默认edit即可。
kubectl create secret docker-registry flink-apps-secret \
--docker-server=registry.cn-hangzhou.aliyuncs.com \
--docker-username=xx \
--docker-password=xxxx \
-n flink-apps
kubectl patch serviceaccount flink-serviceaccount -p '{"imagePullSecrets": [{"name": "flink-apps-secret"}]}' -n flink-apps
3.Dinky中配置
3.1页面上的配置
3.2 Flink sql任务
set 'taskmanager.numberOfTaskSlots' = '2';
set 'parallelism.default' = '2';
set 'kubernetes.container.image' = 'registry.cn-hangzhou.aliyuncs.com/dinkyhub/dinky-flink:1.18.1';
set 'kubernetes.service-account' = 'flink-serviceaccount';
set 'job.autoscaler.enabled' = 'true';
set 'job.autoscaler.metrics.window' = '20s';
set 'job.autoscaler.target.utilization' = '0.30';
set 'job.autoscaler.scale.up.threshold' = '0.05';
set 'job.autoscaler.scale.down.threshold' = '0.1';
set 'job.autoscaler.stabilization.interval' = '5s';
set 'job.autoscaler.cooldown.period' = '5s';
set 'job.autoscaler.scale.up.max.factor' = '1.5';
set 'job.autoscaler.scale.down.max.factor' = '0.5';
set 'metrics.reporters' = 'prometheus';
set 'metrics.reporter.prometheus.factory.class' = 'org.apache.flink.metrics.prometheus.PrometheusReporterFactory';
set 'metrics.reporter.prometheus.port' = '9249';
set 'jobmanager.scheduler' = 'adaptive';
set 'state.backend' = 'rocksdb';
set 'jobmanager.archive.fs.dir'='file:///tmp';
set 'state.checkpoints.dir' = 'file:///tmp/checkpoints';
set 'state.savepoints.dir' = 'file:///tmp/savepoints';
set 'execution.checkpointing.interval' = '10000';
set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.timeout' = '600000';
set 'execution.checkpointing.min.pause' = '10000';
set 'execution.checkpointing.max.concurrent.checkpoints' = '1';
set 'metrics.latency.granularity' = 'operator';
set 'web.backpressure.refresh-interval' = '1000';
set 'metrics.backpressure.enabled' = 'true';
set 'metrics.backpressure.interval' = '1000';
set 'metrics.backpressure.timeout' = '60000';
set 'kubernetes.service.exposed.type' = 'NodePort';
set 'kubernetes.rest-service.exposed.type' = 'NodePort';
set 'kubernetes.jobmanager.service-account' = 'flink-serviceaccount';
--创建源表datagen_source
CREATE TABLE datagen_source
(
id BIGINT,
name STRING
)
WITH ( 'connector' = 'datagen');
--创建结果表blackhole_sink
CREATE TABLE blackhole_sink
(
id BIGINT,
name STRING
)
WITH ( 'connector' = 'blackhole');
--将源表数据插入到结果表
INSERT INTO blackhole_sink
SELECT id,
name
from datagen_source;
3.3 Jar包任务
上传任务jar包复制地址。右键复制jar包地址。比如rs:/flink-test-1.0-SNAPSHOT.jar
set 'taskmanager.numberOfTaskSlots' = '2';
set 'parallelism.default' = '2';
set 'kubernetes.container.image' = 'registry.cn-hangzhou.aliyuncs.com/dinkyhub/dinky-flink:1.18.1';
set 'kubernetes.service-account' = 'flink-serviceaccount';
set 'job.autoscaler.enabled' = 'true';
set 'job.autoscaler.metrics.window' = '20s';
set 'job.autoscaler.target.utilization' = '0.30';
set 'job.autoscaler.scale.up.threshold' = '0.05';
set 'job.autoscaler.scale.down.threshold' = '0.1';
set 'job.autoscaler.stabilization.interval' = '5s';
set 'job.autoscaler.cooldown.period' = '5s';
set 'job.autoscaler.scale.up.max.factor' = '1.5';
set 'job.autoscaler.scale.down.max.factor' = '0.5';
set 'jobmanager.scheduler' = 'adaptive';
set 'metrics.reporters' = 'prometheus';
set 'metrics.reporter.prometheus.port' = '9249';
set 'metrics.reporter.prometheus.factory.class' = 'org.apache.flink.metrics.prometheus.PrometheusReporterFactory';
set 'state.checkpoints.dir' = 'file:///tmp/checkpoints';
set 'state.savepoints.dir' = 'file:///tmp/savepoints';
set 'execution.checkpointing.interval' = '100000';
set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.timeout' = '600000';
set 'execution.checkpointing.min.pause' = '10000';
set 'execution.checkpointing.max.concurrent.checkpoints' = '1';
-- REST service 配置
SET 'kubernetes.rest-service.exposed.type' = 'NodePort';
注意点:
1.修改这里的Dinky地址,不然会下载报错。
2.集群配置地址时。
local:///opt/flink/usrlib/dinky-app-1.18-1.2.0-jar-with-dependencies.jar
注意local后边是三个斜杠,少写一个会报错
3.数据库配置要写静态ip,别写127.0.0.1.不然会报错。