学习大数据DAY62 指标计算
客户需求
第一张汇总报表需要的指标 - 决策报表 汇总表 每次计算只有一天的记录 -
大 BOSS:
- 全部会员数 新增会员数
- 有效会员数 有效会员占比
- 流失会员数: 倒推一年含一年无消费记录的会员
- 净增有效会员数
- 会员消费级别分类人数 (A >=2000 B >=1000 < 2000 C >=500 <1000
D >=100 <500 E <100)
- 会员消费总额
- 会员消费总次数
- 会员消费单价
- 流失会员数
- 新增流失会员
- 60 天会员复购率
- 180 天会员复购率
- 365 天会员复购率
第二张报表用于市场营销 - 明细报表, 普通报表 - 市场部同事
- 筛选大于 30笔的会员或者消费总金额大于 1500 的会员作为目标用户 用于电
话营销
- 字段: 姓名 手机号 消费总额 消费次数 城市 门店 付款偏好 (手机 刷卡 现
金..) 关注的疾病
- 该会员最近 3 个月的月消费订单数和额度 m1_total m1_sale m2_total
m2_sale
第三张报表用于市场营销 - 2022.1-2023.12 每个月消费前 20 的会员名单
24X20=480 条 - 市场部经理
- T+1(月) yyyy-mm 月份
- 会员姓名 联系方式... 消费级别分类, 最近 30 天消费订单数和总额
- 该会员当月前 30 天消费, 60 天消费, 90 天消费 (困难点)
- 报表排序方式: 默认按消费总额倒序 / 按消费次数倒序
- 报表默认显示 2021 年 1 月份的数据, 可选 2 年内任何一个月的数据查看
指标表需要存放在 DM 层
dm_lijinquan.xxx
应该有三个指标表 保存有通过 SQL 计算的结果数据
理解指标的业务的意思 =>
SQL 计算
=> 得到结果 => 保存到结果表
4.指标说明.xlsx
作业
使用模型工具设计这 3 张表 建模 字段名称随意 表名随意 存储的格式为
textfile
使用 spark sql 计算里面的结果, 部署到调度平台
Summary_table:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
#汇总表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()def temp_table():
table_name='total_report_table'
@F.udf()
def a_division_b(val1,val2):
return f"{(val1/val2):.2f}"
@F.udf()
def handle_inc_vaild_members(val1,val2):
return val1-val2
sql='''
--today is 2023-07-24
with
t as(
select member,
count(case when stamp >= trunc('2023-07-24','MM') and
stamp <= last_day('2023-07-24') then 1 end) as
this_months_members,
--当月有消费的会员数
count(case when stamp >= date_sub('2023-07-24',60) and
stamp <= date_sub('2023-07-24', 1) then 1 end) as 60days,
count(case when stamp >= date_sub('2023-07-24',180) and
stamp <= date_sub('2023-07-24', 1) then 1 end) as 180days,
count(case when stamp >= date_sub('2023-07-24',365) and
stamp <= date_sub('2023-07-24', 1) then 1 end) as 365days
from dwd_tanghongjian.erp_u_sale_m eusm
group by member),
tt as (
select
count(case when this_months_members>0 then 1 end) as
this_months_members,
count(case when this_months_members>0 and 60days > 0 then
1 end) as 60day,
count(case when this_months_members>0 and 180days > 0 then
1 end) as 180day,
count(case when this_months_members>0 and 365days > 0 then
1 end) as 365day
from t),
a as (select * from
dws_tanghongjian.dws_xbd_mxm_memberinfo_dim_t),
aa as (
select
count(a.mbr_cardno) as all_members,
--全部会员数
count(case when a.first_order_date >=
trunc('2023-07-24','MM') and a.first_order_date <=
last_day('2023-07-24') then 1 end) as inc_members, --新增会员数
(当月新增首次消费会员数)
count(case when a.last_order_date >=
date_sub(last_day('2023-07-24'), 180) then 1 end ) as
vaild_members,
--有效会员数 (按当月月末,倒推最近 180 天有消费的会
员)
"" as per_members,
--有效会员占比 (有效会员数 / 全
部会员数)
count(case when a.last_order_date <=
date_sub(last_day('2023-07-24'), 365) then 1 end ) as
loss_members,
--流失会员数 (按当月月末,倒推一年含一年)无消费
记录的会员)
"" as inc_vaild_members,
--净增有效会员数 (新增有效
会员 - 流失有效会员)
count(case when a.group_name='A 星用户组' then 1 end)
as A_members,
count(case when a.group_name='B 星用户组' then 1 end)
as B_members,
count(case when a.group_name='C 星用户组' then 1 end)
as C_members,
count(case when a.group_name='D 星用户组' then 1 end)
as D_members,
count(case when a.group_name='E 星用户组' then 1 end)
as E_members,
round(sum(a.order_amount),2) as total_consume,
--
会员消费总额
sum(a.order_total) as total_consume_times,
--会员消
费总次数
"" as consume_unit_price,
--会员消费单价(会员含税销
售收入 / 会员来客数)
count(case when a.last_order_date <=
date_sub(last_day('2023-07-24'), 365) and a.last_order_date >
add_months(last_day('2023-07-24'), -13) then 1 end ) as
new_loss_members,
--新增流失会员--当月有消费的会员向前 60 天内仍有消费的会员数/当月有消费
的会员数、如客户 A 在 10 月 7 日有消费,则向前推 8 月 7 日-10 月 6 日有消费,
则视为 60 天复购会员
"" as 60_days,
--60 天会员复购率
"" as 180_days,
--180 天会员复购率
"" as 365_days
--365 天会员复购率
from a)
select date_format(current_date(), 'yyyy-MM-dd') as
time_,aa.*,tt.* from aa cross join tt
'''
df = spark.sql(sql)
#有效会员占比 (有效会员数 / 全部会员数)
df = df.withColumn("per_members",
a_division_b("vaild_members","all_members"))
#净有效会员数 (有效会员 - 流失会员)
df = df.withColumn("inc_vaild_members",
handle_inc_vaild_members("vaild_members","loss_members"))
#会员消费单价
df = df.withColumn("consume_unit_price",
a_division_b("total_consume","all_members"))
#60 天会员复购率
df = df.withColumn("60_days",
a_division_b("60day","this_months_members"))
#90 天会员复购率
df = df.withColumn("180_days",
a_division_b("180day","this_months_members"))
#180 天会员复购率
df = df.withColumn("365_days",
a_division_b("365day","this_months_members"))
del_columns=['this_months_members','60day','180day','365day
']
dfs = df.drop(*del_columns)
dfs.show()
saveAsTable(dfs,table_name)
def saveAsTable(df,table_name):
# 保存
# 创建 HDFS 路径os.system(f"hadoop fs -mkdir -p
/zhiyun/shihaihong/dm/{table_name}")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dm_shihaihong location
"/zhiyun/shihaihong/dm";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc
因为 spark 会把 orc 表的版本存成高版
本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","TextFile") \
.option("location",f"/zhiyun/shihaihong/dm/{table_name}
"). \
saveAsTable(f"dm_shihaihong.{table_name}")
print("保存成功")
temp_table()
任务调度:
Sales_Analysis_Report:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
#筛选大于 30 笔的会员或者消费总金额大于 1500 的会员作为目标用户spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
def temp_table():
table_name='datail_table'
sql='''
with t as (
select member,
count(case when stamp >= add_months('2023-07-24', -1) and
stamp <= '2023-07-24' then 1 end) as m1_total,
sum(case when stamp >= add_months('2023-07-24', -1) and
stamp <= '2023-07-24' then precash end) as m1_sale,
count(case when stamp >= add_months('2023-07-24', -2) and
stamp <= add_months('2023-07-24', -1) then 1 end) as m2_total,
sum(case when stamp >= add_months('2023-07-24', -2) and
stamp <= add_months('2023-07-24', -1) then precash end) as m2_sale,
count(case when stamp >= add_months('2023-07-24', -3) and
stamp <= add_months('2023-07-24', -2) then 1 end) as m3_total,
sum(case when stamp >= add_months('2023-07-24', -3) and
stamp <= add_months('2023-07-24', -2) then precash end) as m3_sale
from dwd_tanghongjian.erp_u_sale_m eusm
group by member
),a as (
select * from dws_tanghongjian.dws_xbd_mxm_memberinfo_dim_t
dxmmdt
WHERE order_total > 30 or order_amount>1500)
SELECT
a.name,
a.phone,
a.order_total,
a.order_amount,
a.address,a.store_code,
a.pay_fav_type,
a.sickness_motion,
t.m1_total,
coalesce(t.m1_sale, 0) AS m1_sale,
t.m2_total,
coalesce(t.m2_sale, 0)+coalesce(t.m1_sale, 0) AS m2_sale,
t.m3_total,
coalesce(t.m3_sale, 0)+coalesce(t.m1_sale,
0)+coalesce(t.m2_sale, 0) AS m3_sale
from a left join t on a.mbr_cardno = t.member
'''
df = spark.sql(sql)
df.show()
saveAsTable(df,table_name)
def saveAsTable(df,table_name):
# 保存
# 创建 HDFS 路径
os.system(f"hadoop fs -mkdir -p
/zhiyun/shihaihong/dm/{table_name}")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dm_shihaihong location
"/zhiyun/shihaihong/dm";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc
因为 spark 会把 orc 表的版本存成高版
本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","TextFile") \
.option("location",f"/zhiyun/shihaihong/dm/{table_name}
"). \
saveAsTable(f"dm_shihaihong.{table_name}")
print("保存成功")temp_table()
任务调度:
python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as Fimport os
#月消费情况 - 报表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
def temp_table():
table_name='front_20'
sql='''
with a as (
select member,
date_format(stamp, 'yyyy-MM') as year_month,
sum(precash) as total_consumption
from dwd_tanghongjian.erp_u_sale_m
where stamp between '2022-01-01' and '2023-12-31'
group by member,date_format(stamp, 'yyyy-MM')),
aa as (
select member,year_month,total_consumption,
row_number()over(partition by year_month order by
total_consumption desc) as r
from a),
re as (
select member,year_month,total_consumption
from aa where r<=20 order by year_month,total_consumption
desc),
t as (
select member,year_month,total_consumption,
lead(member,1,0)over(partition by member order by
year_month) as next_member,
lead(year_month,1,0)over(partition by member order by
year_month) as one_month,lead(year_month,2,0)over(partition by member order by
year_month) as two_month,
lead(year_month,3,0)over(partition by member order by
year_month) as three_month,
lead(total_consumption,1,0)over(partition by member
order by year_month) as one_month_total,
lead(total_consumption,2,0)over(partition by member
order by year_month) as two_month_total,
lead(total_consumption,3,0)over(partition by member
order by year_month) as three_month_total
from a ),
tt as (
select member,year_month,total_consumption,
case when member = next_member then
case when
(substr(one_month, 1, 4) = substr(year_month, 1, 4) AND
cast(substr(one_month, 6) as int) = cast(substr(year_month, 6) as
int) + 1)
OR
(substr(one_month, 1, 4) = cast(substr(year_month, 1,
4) as int) + 1 AND cast(substr(one_month, 6) as int) = 1)
then one_month_total else 0 end else 0 end as
one_month_total,
case when member = next_member then
case when
(substr(two_month, 1, 4) = substr(year_month, 1, 4) AND
cast(substr(two_month, 6) as int) = cast(substr(year_month, 6) as
int) + 2)
OR
(substr(two_month, 1, 4) = cast(substr(year_month, 1,
4) as int) + 1 AND cast(substr(two_month, 6) as int) = 2)
then two_month_total else 0 end else 0 end as
two_month_total,
case when member = next_member then
case when
(substr(three_month, 1, 4) = substr(year_month, 1, 4)
AND cast(substr(three_month, 6) as int) = cast(substr(year_month,
6) as int) + 3)
OR
(substr(three_month, 1, 4) = cast(substr(year_month,
1, 4) as int) + 1 AND cast(substr(three_month, 6) as int) = 3)
then three_month_total else 0 end else 0 end as
three_month_totalfrom t),
re2 as (
select member,year_month,total_consumption,
one_month_total as 30_days,one_month_total +
two_month_total as 60_days,
one_month_total + two_month_total + three_month_total as
90_days
from tt)
select
re.member,
ddd.name,
ddd.phone,
ddd.order_total,
ddd.order_amount,
ddd.address,
ddd.store_code,
ddd.pay_fav_type,
ddd.sickness_motion,
ddd.order_30,
ddd.amount_30,
re2.30_days,
re2.60_days,
re2.90_days
from re left join re2 on re.member = re2.member and
re.year_month = re2.year_month
left join dws_tanghongjian.dws_xbd_mxm_memberinfo_dim_t
ddd on re.member = ddd.mbr_cardno
'''
df = spark.sql(sql)
df.show()
saveAsTable(df,table_name)
def saveAsTable(df,table_name):
# 保存
# 创建 HDFS 路径
os.system(f"hadoop fs -mkdir -p
/zhiyun/shihaihong/dm/{table_name}")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists dm_shihaihong location
"/zhiyun/shihaihong/dm";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc
因为 spark 会把 orc 表的版本存成高版
本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","TextFile") \
.option("location",f"/zhiyun/shihaihong/dm/{table_name}
"). \
saveAsTable(f"dm_shihaihong.{table_name}")
print("保存成功")
temp_table()
任务调度: