大数据-269 实时数仓 - DIM DW ADS 层处理 Scala实现将数据写出HBase等
点一下关注吧!!!非常感谢!!持续更新!!!
Java篇开始了!
- MyBatis 更新完毕
- 目前开始更新 Spring,一起深入浅出!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据挖掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(已更完)
- 实时数仓(正在更新…)
章节内容
- ODS层
- 将 Kafka 中的维度表写入 DIM
DIM 层处理
实现思路
最原始的表 MySQL 中 area 到 HBase 中
转换 area 表 到 地区ID、地区的名字、城市ID、城市的名字、省份 ID、省份的名字 到 HBase 中
在分析交易过程时,可以通过卖家、买家、商品和时间等维度描述交易发生的环境,所以维度的作用一般是查询约束、分类汇总以及排序等。
HBaseReader
class HBaseReader extends RichSourceFunction[(String, String)] {
private var conn: Connection = null
private var table: Table = null
private var scan: Scan = null
override def open(parameters: configuration.Configuration): Unit = {
val conf: Configuration = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM, "h121.wzk.icu,h122.wzk.icu,h123.wzk.icu")
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
val tableName: TableName = TableName.valueOf("wzk_area")
val cf1: String = "f1"
conn = ConnectionFactory.createConnection(conf)
table = conn.getTable(tableName)
scan = new Scan()
scan.addFamily(Bytes.toBytes(cf1))
}
override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = {
val rs: ResultScanner = table.getScanner(scan)
val iterator: util.Iterator[Result] = rs.iterator()
while (iterator.hasNext) {
val result: Result = iterator.next()
val rowKey: String = Bytes.toString(result.getRow)
val buffer: StringBuffer = new StringBuffer()
for (cell: Cell <- result.listCells().asScala) {
val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
buffer.append(value).append("-")
}
val valueString: String = buffer.replace(buffer.length() - 1, buffer.length(), "").toString
ctx.collect((rowKey, valueString))
}
}
override def cancel(): Unit = {
}
override def close(): Unit = {
try {
if (table != null) {
table.close()
}
if (conn != null) {
conn.close()
}
} catch {
case e: Exception => println(e.getMessage)
}
}
}
HBaseWriterSink
class HBaseWriterSink extends RichSinkFunction[String] {
var connection: Connection = _
var hbTable: Table = _
override def open(parameters: Configuration): Unit = {
connection = new ConnHBase().connToHbase
hbTable = connection.getTable(TableName.valueOf("dim_wzk_area"))
}
override def close(): Unit = {
if (hbTable != null) {
hbTable.close()
}
if (connection != null) {
connection.close()
}
}
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
insertDimArea(hbTable, value)
}
def insertDimArea(hbTable: Table, value: String): Unit = {
val infos: Array[String] = value.split(",")
val areaId: String = infos(0).trim
val aname: String = infos(1).trim
val cid: String = infos(2).trim
val city: String = infos(3).trim
val proid: String = infos(4).trim
val province: String = infos(5).trim
val put = new Put(areaId.getBytes())
put.addColumn("f1".getBytes(), "aname".getBytes(), aname.getBytes())
put.addColumn("f1".getBytes(), "cid".getBytes(), cid.getBytes())
put.addColumn("f1".getBytes(), "city".getBytes(), city.getBytes())
put.addColumn("f1".getBytes(), "proId".getBytes(), proid.getBytes())
put.addColumn("f1".getBytes(), "province".getBytes(), province.getBytes())
hbTable.put(put)
}
}
AreaDetail
case class AreaDetail(id: Int, name:String, pid: Int)
AreaDetailInfo
object AreaDetailInfo {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val data: DataStream[(String, String)] = env.addSource(new HBaseReader)
val dataStream: DataStream[AreaDetail] = data.map(x => {
val id: Int = x._1.toInt
val datas: Array[String] = x._2.split("-")
val name: String = datas(5).trim
val pid: Int = datas(6).trim.toInt
AreaDetail(id, name, pid)
})
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tableEnv.createTemporaryView("wzk_area",dataStream)
val sql : String =
"""
|select a.id as areaid,a.name as aname,a.pid as cid,b.name as city, c.id as proid,c.name as province
|from wzk_area as a
|inner join wzk_area as b on a.pid = b.id
|inner join wzk_area as c on b.pid = c.id
|""".stripMargin
val areaTable: Table = tableEnv.sqlQuery(sql)
println("--- sqlQuery ---")
val resultStream: DataStream[String] = tableEnv.toRetractStream[Row](areaTable).map(x => {
val row: Row = x._2
val areaId: String = row.getField(0).toString
val aname: String = row.getField(1).toString
val cid: String = row.getField(2).toString
val city: String = row.getField(3).toString
val proid: String = row.getField(4).toString
val province: String = row.getField(5).toString
areaId + "," + aname + "," + cid + "," + city + "," + proid + "," + province
})
resultStream.addSink(new HBaseWriterSink)
env.execute()
}
}
DimArea
case class DimArea(areaId:Int, aname:String, cid:Int, city:String, proId:Int, province:String)
运行测试
我们需要运行 AreaDetailInfo.scala,执行效果如下所示:
DW 层处理
DW(Data WareHouse 数据仓库层),包含 DWD、DWS、DIM 层数据加工而成,主要完成数据架构与整合,建立一致性的维度,构建可复用的面向分析和统计的明细事实表,以及汇总公共粒度的指标。
- DWD(Data Warehouse Detail 细节数据层),是业务层与数据仓库的隔离层,以业务过程作为建模驱动,基于每个具体的业务过程特点,构建细粒度的明细层事实表。可以结合企业的数据使用特点,将明细事实表的某些重要维度属性字段做适当冗余,也即宽表化处理。
- DWS(Data WareHouse Service 服务数据层),基于 DWD 的基础数据,整合汇总成分析某一个主题域的服务数据,以分析的主题为建模驱动,基于上层的应用和产品的指标需求,构建公共粒度的汇总指标事实表
- 公共维度层(DIM):基于维度建模理念思想,建立一致性维度
- TMP 层:临时层,存放计算过程中临时产生的数据
需要注意的是,数据仓库层次划分不是固定不变的,可以根据实际需求进行适当裁剪或者添加,如果业务相对简单和独立,可以将 DWD、DWS 进行合并。
ADS层处理
ADS(Application Data Store 应用层数据)。基于 DW 数据,整合汇总成主题域的服务数据,用于提供后续的业务查询等。
数据明细层
从数据明细层分析结果到 ClickHouse、Redis、Druid 等写入到 Redis 中。
class MySinkToRedis extends RichSinkFunction[(CityOrder, Long)] {
private var jedis: Jedis = _;
override def open(parameters: Configuration): Unit = {
jedis = new Jedis("h121.wzk.icu", 6379, 6000);
jedis.select(0); // 选择 Redis 的第 0 个数据库
}
override def invoke(value: (CityOrder, Long), context: SinkFunction.Context[_]): Unit = {
if (!jedis.isConnected) {
jedis.connect();
}
val map = new HashMap[String, String]();
map.put("totalMoney", value._1.totalMoney.toString);
map.put("totalCount", value._1.totalCount.toString);
map.put("time", value._2.toString);
// 打印信息用于调试
println(s"${value._1.province}${value._1.city} 数据条目: ${map.size()}, 金额: ${map.get("totalMoney")}, 数量: ${map.get("totalCount")}, 时间: ${map.get("time")}");
try {
jedis.hset(s"${value._1.province}${value._1.city}", map);
map.clear();
} catch {
case e: Exception => e.printStackTrace();
}
}
override def close(): Unit = {
if (jedis != null) {
jedis.close();
}
}
}