二百七十四、Kettle——ClickHouse中对错误数据表中进行数据修复(实时)
一、目的
在完成数据清洗、错误数据之后,需要根据修复规则对错误数据进行修复
二、Hive中原有代码
insert into table hurys_db.dwd_queue partition(day) select a3.id, a3.device_no, a3.source_device_type, a3.sn, a3.model, a3.create_time, a3.lane_no, a3.lane_type, case when a3.queue_count between 0 and 100 then a3.queue_count else a2.avg_queue_count end as queue_count, case when a3.queue_len between 0 and 500 then a3.queue_len else a2.avg_queue_len end as queue_len, case when a3.queue_head between 0 and 500 then a3.queue_head else a2.avg_queue_head end as queue_head, case when a3.queue_tail between 0 and 500 then a3.queue_tail else a2.avg_queue_tail end as queue_tail, a3.day from hurys_db.dwd_queue_error as a3 right join (select a1.device_no, a1.create_time, a1.lane_no, round(avg(queue_count),0) avg_queue_count, round(avg(queue_len),2) avg_queue_len, round(avg(queue_head),2) avg_queue_head, round(avg(queue_tail),2) avg_queue_tail from(select t1.device_no, t1.create_time start_time, t2.create_time, t1.lane_no, t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail from hurys_db.dwd_queue as t1 right join hurys_db.dwd_queue_error as t2 on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and concat(date_sub(t2.create_time,7),substr(t2.create_time,11,10)) = t1.create_time where t1.device_no is not null union all select t1.device_no, t1.create_time start_time, t2.create_time, t1.lane_no, t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail from hurys_db.dwd_queue as t1 right join hurys_db.dwd_queue_error as t2 on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and concat(date_sub(t2.create_time,14),substr(t2.create_time,11,10)) = t1.create_time where t1.device_no is not null union all select t1.device_no, t1.create_time start_time, t2.create_time, t1.lane_no, t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail from hurys_db.dwd_queue as t1 right join hurys_db.dwd_queue_error as t2 on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and concat(date_sub(t2.create_time,21),substr(t2.create_time,11,10)) = t1.create_time where t1.device_no is not null ) as a1 group by a1.device_no, a1.create_time, a1.lane_no ) as a2 on a3.device_no=a2.device_no and a3.create_time=a2.create_time and a3.lane_no=a2.lane_no where a3.day='2024-09-04' ;
三、ClickHouse中现有代码
--43、静态排队字段数据修复 --修复策略:使用前三周同期数据取平均进行修复 select a3.id, a3.device_no, a3.source_device_type, a3.sn, a3.model, a3.create_time, a3.lane_no, a3.lane_type, case when a3.queue_count between 0 and 100 then a3.queue_count else a2.avg_queue_count end as queue_count, case when a3.queue_len between 0 and 500 then a3.queue_len else cast(a2.avg_queue_len as Decimal(10,2)) end as queue_len, case when a3.queue_head between 0 and 500 then a3.queue_head else cast(a2.avg_queue_head as Decimal(10,2)) end as queue_head, case when a3.queue_tail between 0 and 500 then a3.queue_tail else cast(a2.avg_queue_tail as Decimal(10,2)) end as queue_tail, cast(a3.day as String) day from hurys_jw.dwd_queue_error as a3 right join ( select device_no, start_time, lane_no, round(avg(queue_count),0) avg_queue_count, round(avg(queue_len) ,2) avg_queue_len, round(avg(queue_head),2) avg_queue_head, round(avg(queue_tail),2) avg_queue_tail from( select t1.device_no, t2.create_time start_time,t2.create_time_7 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail from hurys_jw.dwd_queue as t1 inner join(select device_no,lane_no,create_time, (create_time - interval 7 day) create_time_7, (create_time - interval 14 day)create_time_14, (create_time - interval 21 day)create_time_21 from hurys_jw.dwd_queue_error) as t2 on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and t2.create_time_7=t1.create_time where t1.device_no is not null union all select t1.device_no, t2.create_time start_time,t2.create_time_14 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail from hurys_jw.dwd_queue as t1 inner join(select device_no,lane_no,create_time, (create_time - interval 7 day) create_time_7, (create_time - interval 14 day)create_time_14, (create_time - interval 21 day)create_time_21 from hurys_jw.dwd_queue_error) as t2 on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and t2.create_time_14=t1.create_time where t1.device_no is not null union all select t1.device_no, t2.create_time start_time,t2.create_time_21 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail from hurys_jw.dwd_queue as t1 inner join(select device_no,lane_no,create_time, (create_time - interval 7 day) create_time_7, (create_time - interval 14 day)create_time_14, (create_time - interval 21 day)create_time_21 from hurys_jw.dwd_queue_error) as t2 on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and t2.create_time_21=t1.create_time where t1.device_no is not null) where lane_no is not null group by device_no, start_time, lane_no ) as a2 on a3.device_no=a2.device_no and a3.create_time=a2.start_time and a3.lane_no=a2.lane_no where a3.day >= ? ;
注意:Hive中原有SQL语句和ClickHouse现有SQL语句很大不同
四、Kettle任务
方框标记是修复数据,高频率执行
下面其他几个是修复记录任务
4.1 newtime
4.2 替换NULL值
4.3 clickhouse输入
select
a3.id,
a3.device_no,
a3.source_device_type,
a3.sn,
a3.model,
a3.create_time,
a3.lane_no,
a3.lane_type,
case when a3.queue_count between 0 and 100 then a3.queue_count else a2.avg_queue_count end as queue_count,
case when a3.queue_len between 0 and 500 then a3.queue_len else cast(a2.avg_queue_len as Decimal(10,2)) end as queue_len,
case when a3.queue_head between 0 and 500 then a3.queue_head else cast(a2.avg_queue_head as Decimal(10,2)) end as queue_head,
case when a3.queue_tail between 0 and 500 then a3.queue_tail else cast(a2.avg_queue_tail as Decimal(10,2)) end as queue_tail,
cast(a3.day as String) day
from hurys_jw.dwd_queue_error as a3
right join (
select
device_no,
start_time,
lane_no,
round(avg(queue_count),0) avg_queue_count,
round(avg(queue_len) ,2) avg_queue_len,
round(avg(queue_head),2) avg_queue_head,
round(avg(queue_tail),2) avg_queue_tail
from(
select
t1.device_no, t2.create_time start_time,t2.create_time_7 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(select
device_no,lane_no,create_time,
(create_time - interval 7 day) create_time_7,
(create_time - interval 14 day)create_time_14,
(create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and t2.create_time_7=t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t2.create_time start_time,t2.create_time_14 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(select
device_no,lane_no,create_time,
(create_time - interval 7 day) create_time_7,
(create_time - interval 14 day)create_time_14,
(create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and t2.create_time_14=t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t2.create_time start_time,t2.create_time_21 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(select
device_no,lane_no,create_time,
(create_time - interval 7 day) create_time_7,
(create_time - interval 14 day)create_time_14,
(create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no and t2.lane_no=t1.lane_no and t2.create_time_21=t1.create_time
where t1.device_no is not null)
where lane_no is not null
group by device_no, start_time, lane_no
) as a2
on a3.device_no=a2.device_no and a3.create_time=a2.start_time and a3.lane_no=a2.lane_no
where a3.day >= ?
;
4.4 字段选择
4.5 clickhouse输出
4.6 执行SQL脚本
由于是对每一天的错误进行修复,因此每次执行后需要先删除这个分区的错误数据。因为每次执行清洗后都会先执行错误数据任务!
4.7 执行任务
4.8 海豚调度
注意在DWD层静态排队数据清洗、DWD层静态排队错误数据之后