基于Flink SQL的实时指标多维分析模型
数据流程介绍
1.创建源表kafka接入消息队列数据,定义字段映射规则;
2.创建目标表es_sink配置Elasticsearch输出;
3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算;
4.使用ROLLUP进行多维聚合统计;
5.最终计算结果写入ES,包含成功率等衍生指标。
Flink SQL 逻辑
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 单位:ms, 10天
--SET table.exec.state.ttl = 864000000
CREATE TABLE kafkaTable (
mid bigint,
db string,
sch string,
tab string,
opt string,
ts bigint,
ddl string,
err string,
src map<string,string>,
cur map<string,string>,
cus map<string,string>,
id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),
task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),
account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),
retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),
update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),
event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
proctime AS PROCTIME()
-- WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE --SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'jdq.client.id' = 'xxx',
'jdq.password' = 'xxx',
'jdq.domain' = 'xxx',
'scan.startup.mode' = 'group-offsets', -- default: group-offsets,other: latest-offset,earliest-offset
-- 'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
'format' = 'binlog'
);
CREATE TABLE es_sink(
send_type STRING
,task_id STRING
,month_dim STRING
,day_dim STRING
,grouping_id INTEGER
,init INTEGER
,cancel INTEGER
,succ INTEGER
,fail INTEGER
,cancel_rate float
,succ_rate float
,fail_rate float
,update_date STRING
,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)
with (
'connector' = 'elasticsearch-6',
'index' = 'index01',
'document-type' = 'type01',
'hosts' = 'xx',
'format' = 'json',
'filter.null-value'='true',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.max-size' = '10mb'
);
-- 维度:
-- - send_type, 发送类型
-- - month_dim,月份维度
-- - day_dim,天维度
-- - task_id,任务ID
CREATE view tmp as
select
send_type,
task_id,
publish_time,
msg_status,
case when UPPER(opt) = 'INSERT' and msg_status='0' then 1 else 0 end AS init,
case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,
case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,
case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,
update_time,
opt,
ts,
id,
proctime,
SUBSTRING(publish_time,1,7) as month_dim,
SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'
and publish_time >= '2025-01-01 00:00:00'
and
( (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)
or (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)
or (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0)
);
--去重模式,去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。在聚合sum或count时,Flink回撤流会对数据进行回撤处理
create view tmp_dedup as
select * from
(
select *,
row_number() over(partition by id,msg_status order by proctime desc) as rn
from tmp
) t
where rn=1;
CREATE view tmp1 as
select
send_type
,task_id
,month_dim
,day_dim
,init
,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel
,succ
,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail
,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;
CREATE view tmp2 as
select
send_type
,SPLIT_INDEX(task_id,'_R',0) AS task_id
,month_dim
,day_dim
,init
,cancel
,succ
,-1 AS fail
,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and succ = 1 ;
CREATE view tmp3 as
select
send_type
,task_id
,month_dim
,day_dim
,init
,cancel
,succ
,fail
from tmp1
UNION ALL
select
send_type
,task_id
,month_dim
,day_dim
,init
,cancel
,succ
,fail
from tmp2;
CREATE view tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */
COALESCE(send_type,'N') AS send_type
,COALESCE(month_dim,'N') AS month_dim
,COALESCE(day_dim,'N') AS day_dim
,COALESCE(task_id,'N') AS task_id
,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1
when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2
when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3
when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4
when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5
end grouping_id
,sum(init) as init
,sum(cancel) as cancel
,sum(succ) as succ
,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上
INSERT INTO es_sink
select
case when trim(send_type) = '1' then '发送类型1'
when trim(send_type) = '2' then '发送类型2'
else send_type end AS send_type
,task_id
,month_dim
,day_dim
,grouping_id
,init
,cancel
,succ
,fail
,ROUND(cancel*100.0/init,2) AS cancel_rate
,ROUND(succ*100.0/(init - cancel),2) AS succ_rate
,ROUND(fail*100.0/(init - cancel),2) AS fail_rate
,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;
es mapping
#POST index01/type01/_mapping
{
"type01": {
"properties": {
"grouping_id": {
"type": "byte"
},
"send_type": {
"type": "keyword",
"ignore_above": 256
},
"month_dim": {
"type": "keyword",
"fields": {
"text": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM",
"ignore_malformed":"true" --忽略错误的各式
}
}
},
"day_dim": {
"type": "keyword",
"fields": {
"text": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd",
"ignore_malformed":"true"
}
}
},
"task_id": {
"type": "keyword"
},
"init": {
"type": "integer"
},
"cancel": {
"type": "integer"
},
"succ": {
"type": "integer"
},
"fail": {
"type": "integer"
},
"cancel_rate": {
"type": "float"
},
"succ_rate": {
"type": "float"
},
"fail_rate": {
"type": "float"
},
"update_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}