Dbt基本概念与快速入门
在过去的几年里,数据科学界已经慢慢地接受了以数据为中心的范式。我们不仅关注日益复杂的机器学习模型,还要更多地关注数据质量。这使得数据工程、分析工程领域技术和工具成为热点。dbt(数据构建工具)是一个显著改善数据工程师生活的工具。它的目的是向数据工程引入经过实战检验的软件工程最佳实践,并尽可能快速、轻松地产生数据价值。本文将为入门级数据工程师介绍dbt的基础知识,包括数据仓库与dbt相关的概念,为未来进一步深入了解做基础准备。
数据仓库
首先我们需要了解的前置概念是数据仓库。数据仓库是组织存储所有数据的地方。建立仓库是为数据分析、数据挖掘和数据产品等应用场景提供数据。数仓将历史数据有组织地存储到数据表中,且它们的结构是专门为了快速查询和分析。与Dbt相关的数据仓库概念中最重要是数据分析建模,尤其是事实维度模型。
ETL VS ELT
提取-转换-加载(ETL),在抽取时进行转换,转换后载入数仓。这种方式对于大数量时,如果因为网络或转换异常导致过程中断,处理中断的清理工作相对比较麻烦。与之对应的是(ELT)和提取-加载-转换,仅需要一次载入,载入之后在本地进行转换逻辑,使得整个过程更快更稳定。
从最简单的定义来看,dbt(数据构建工具)是一种开源数据转换工具,通常用于数据仓库中的数据建模。它使数据专业人员能够轻松编写SQL,从而更有效地转换仓库中的数据。在更细粒度的级别上,它是一个具有开发框架的工具,该框架将模块化SQL与软件工程最佳实践相结合,以创建更好、更快、更可靠的数据转换体验。模块化SQL是指将SQL代码分解为更小的、可重用的块,以便更容易维护、测试和重用的实践。dbt很适合的其他最佳实践包括版本控制、文档、DevOps等等。
dbt定位为转换工具,并不处理它所连接的数据存储之外的数据移动,这一点非常重要。因此整个数据交换过程,需要与其他工具/服务相结合来创建完整的ETL/ELT管道。
dbt-core
dbt(Data Build Tool) 包括两个工具:dbt Core和dbt Cloud。
dbt Core是一个开源库,实现了dbt的大部分功能。它有一个命令行接口(您将会喜欢的dbt命令),您可以使用它来管理项目中的数据转换。dbt Cloud是一个面向企业的解决方案。在CLI之上,dbt Cloud还提供了一个更加用户友好的基于web的IDE。
我们主要介绍dbt Core,因为它最适合本地项目、测试和学习。您可以使用pip在任何操作系统上安装它(当然最后是在python虚拟环境中)。
$ conda create -n learn_dbt -y
// $ pip install dbt-<adapter_name>
$ pip install dbt-duckdb
这里应该将adapter_name替换为您想要使用的数据库。dbt Labs (dbt所属公司)已经为不同的数据平台集成了许多适配器。在本系列中,我们将使用dbt-duckdb适配器连接到DuckDB数据库。但是您可以使用dbt文档中列出的任何其他适配器。安装可能需要等几分钟,完了我们准备DBT之旅。
dbt 项目概述
dbt项目是包括在操作系统中特定目录里,其中包含对数据执行转换所需的所有内容。它包含许多.sql文件(称为模型)和YAML文件(用于配置)。创建dbt工程,可以在命令行中使用dbt init <project_name>命令。
$ dbt init dbt_learn
$ cd dbt_learn
终端要求您输入与可用数据平台适配器对应的代码。因为只有DuckDB,所以可以按1。在dbt_learn内部,有如下结构:
├── analyses
├── dbt_packages
├── logs
├── macros
├── models
├── seeds
├── snapshots
├── target
├── tests
├── README.md
├── dbt_project.yml
├── package-lock.yml
├── packages.yml
└── profiles.yml
这就是dbt项目为你生成的,让你快速从数据工程师成为软件工程师:
- 模块化: 保持数据转换有组织,并将其划分为可管理的单元,使代码更容易理解和维护。
- 版本控制: 跟踪更改并恢复到模型的先前版本,确保数据一致性和可再现性。
- 协作: 采用分领域、分模型组织工程,可以轻松实现多人协作高效开发模型。
- 可测试的: 为了确保模型按预期工作,可以编写模型测试用例,在部署到生产环境之前识别潜在问题。
- 可重复: 使用相同的项目在不同环境(如:开发环境、生产环境)中进行一致的数据转换。
简而言之,dbt项目提供了一种强大的方式来管理和编排数据转换。它为数据世界带来了软件工程的优势价值。
dbt项目Profiles
前节我们已经初始化dbt项目,现在我们需要连接到数据库(或者从头创建数据库)。为此,我们需要一种安全的方式向dbt提供数据库凭据以建立连接,我们将使用项目配置文件进行配置。
项目Profiles文件是YAML文件,包含所选数据平台的连接详细信息等。文件本身是在dbt项目文件夹中创建的,命名为profiles.yml。这是它现在的样子:
dbt_learn:
target: dev
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 4
prod:
type: duckdb
path: prod.duckdb
threads: 4
该文件为我们的项目列出了一个名为dbt_learn的配置文件。它指定了两个输出:dev和prod。输出是指定到数据仓库或数据库的不同连接的单个配置。通过输出可以管理到不同环境的连接:
- 发展
- 测试
- 生产等
在我们的配置文件中,默认输出是dev,它列在target字段中,也可以根据需要将其更改为其他类型输出。
注意: 你可以更改profiles名称和输出名称,只要它们在其余dbt文件中被正确引用。如在项目配置文件(dbt_project.yml)中有profiles文件的引用:profile: ‘dbt_learn’。同时需要提醒的是,该文件不要用中文注释。
path字段指定名为dev.duckdb
的现有数据库的位置。如果不存在,将由dbt DuckDB
适配器在工作目录中创建(在dbt_learn项目中path
字段指定项目目录的相对路径)。由于当前没有名为dev.duckdb
的数据库,因此我们将通过运行dbt debug让dbt创建它。
$ dbt debug
debug子命令用于测试项目的许多方面,例如:
profiles.yml
配置文件的错误profiles.yml
中的数据库连接信息- 数据库适配器
dbt_project.yml
文件中的错误等
如果看到一条绿色的“All checks passed!”消息,并出现一个新的dev.duckdb数据库,那么说明已经连接成功,你就可以开始新的任务了。
dbt数据模型
模型是dbt的核心,因为它们代表了dbt所擅长的实际转换。数据模型是一个概念,它表示一组数据中的结构和关系。dbt中的数据模型更简单、更具体。它们具有以下属性:
- 表示数据转换(如执行清理操作)
- 通常在.sql文件中使用SQL编写(在较新的dbt版本中允许使用Python)
- 通常由一个SELECT查询组成
我们的dbt_learn项目在models/example中预先填充了两个虚拟模型:
$ ll ./models/example/
total 20
drwxrwxr-x 2 tommy tommy 4096 Aug 12 11:01 ./
drwxrwxr-x 3 tommy tommy 4096 Aug 12 11:01 ../
-rw-rw-r-- 1 tommy tommy 475 Aug 12 11:01 my_first_dbt_model.sql
-rw-rw-r-- 1 tommy tommy 115 Aug 12 11:01 my_second_dbt_model.sql
-rw-rw-r-- 1 tommy tommy 447 Aug 12 11:01 schema.yml
下面我们删除示例模型文件,然后创建我们自己的模型:
$ rm -rf models/example
$ mkdir models/stats
$ touch models/stats/average_diamond_price_per_group.sql
在上面代码片段的最后一行中,我们在stats目录中创建了名为diamond_avg_price.sql的模型文件。模型名称具有一定业务意义描述非常有必要。在模型文件中加入下面测试代码:
SELECT 1 AS Id
然后执行dbt run
命令运行模型:
$ dbt run
你应该能够看到绿色"Completed successfully"消息。
至此,我们开始加载一些示例数据到dev.duckdb数据库中。我们将使用parquet文件来实现这一点,因为DuckDB本地支持它们。示例文件diamonds.parquet可以从这里下载。
SELECT AVG(price), cut
FROM "diamonds.parquet"
GROUP BY cut
注意:diamonds.parquet文件需要和数据库文件在相同目录中,查询应该返回相同的成功消息。
上面示例中我们看到了如何使用SELECT语句创建第一个dbt模型,该语句返回关于数据集的一些汇总统计信息,在实践中模型代码将取决于具体业务需求。因此,这里不关注dbt模型的实际逻辑,而是关注如何正确地实现它们。
dbt项目DAG
在实际项目中,模型之间很可能相互依赖,形成某种层次结构。在数据世界中,这种层次结构称为有向无环图(DAG)。一图胜千言,看看dbt DAGs页面中的这个例子:
图1
这张图中有四个模型,它们都与下游的模型线性相连。stg_users和stg_user_groups是int_users的父模型,int_users和上游的stg_orgs模型是dim_users的父模型。
注:经常使用上游和下游这两个词来表示每个模型在DAG中的相对位置。
dag的一个关键特征是没有闭环,这意味着作为下游模型,其产生的结果不能与上游模型结合。这就是“无环”这个词的由来。或者说的更易理解点,不能产生循环依赖。
除了dag的视觉丰富性之外,它们的目的是让dbt根据依赖关系构建/更新模型。如果我们没有为上面的四个模型定义DAG, dbt将按字母顺序构建它们。这将导致各种红色消息和错误,要在dbt中定义DAG,我们将使用Jinja模板。
dbt Jinja模型
在上面的DAG中,int_users模型是由stg_users和stg_user_groups两个模型产生的。我们需要在项目中指定这种关系,否则dbt run按字母顺序执行所有模型,这意味着int_users优先执行。这将导致一个错误,因为它的依赖关系尚未具体化。
假设int_users模型代码如下:
SELECT some_column
FROM stg_users as su
JOIN stg_user_groups as sug
ON su.a = sug.a
现在,我们将通过使用Jinja函数将三个模型转换为DAG的节点来链接它们:
SELECT some_column
FROM {{ ref("stg_users") }} as su
JOIN {{ ref("stg_user_groups" )}} as sug
ON su.a = sug.a
我们没有直接写模型名,而是把它们放在一个名为ref的Jinja函数中。语法是{{ ref(“node_name”) }}(注意空格和引号)。当我们的查询被编译时,Jinja函数被替换为实际的模型名称。
注意,stg_users和stg_user_groups应该作为dbt项目中的.sql文件存在。
现在,当我们执行dbt run时,它将查找每个模型的依赖项,连接它们并相应地执行它们。
Jinja中的ref函数并不是唯一可以在dbt中使用的函数。实际上,通过使用Jinja的其他函数和特性,可以显著扩展SQL语句的功能。下面是一些例子:
- 使用Jinja在模型文件中创建变量:
{% set status = 'active' %} -- Define a variable
SELECT *
FROM customers
WHERE status = {{ status }};
- 使用config对象在模型配置文件中定义变量
如果我们有models/model_properties。Yml文件,它有以下字段:
# models/model_properties.yml
version: 2
models:
- name: my_model
config:
target_schema: analytics
我们可以使用Jinja访问任何.sql文件中的字段:
{% set target_schema = config.target_schema %}
CREATE TABLE {{ target_schema }}.{{ target_table }} AS
...
- 使用条件语句和循环(这真是个惊喜!, 增强SQL能力,实现图灵完备):
{% if some_condition %}
SELECT * FROM test_data
{% else %}
SELECT * FROM production_data
{% endif %}
实现循环示例代码:
SELECT
order_id,
{% for payment_method in ["bank_transfer", "credit_card", "gift_card"] %}
SUM(CASE WHEN payment_method = '{{ payment_method }}' THEN amount END) AS {{ payment_method }}_amount,
{% endfor %}
SUM(amount) AS total_amount
FROM {{ ref('raw_payments') }}
GROUP BY 1;
-
通过Jinja创建SQL(宏)中的函数(不,我不是在开玩笑)
这是宏函数示例:
{% macro create_table(table_name, columns) %}
CREATE TABLE {{ table_name }} (
{% for column in columns %}
{{ column.name }} {{ column.type }},
{% endfor %}
);
{% endmacro %}
下面代码展示如何调用宏函数:
{% call create_table('my_customer_table', [
{'name': 'id', 'type': 'integer'},
{'name': 'name', 'type': 'varchar(255)'},
{'name': 'email', 'type': 'varchar(255)'},
]) %}
INSERT INTO {{ my_customer_table }} (id, name, email)
SELECT customer_id, customer_name, customer_email
FROM raw_customers;
想了解更多关于在dbt和SQL中使用Jinja的信息,请查看dbt官方文档。后续我们也会持续更新。
dbt测试
一个优秀的软件工程人员知道需要不断地测试代码中的错误缺陷,dbt将数据工程师转为软件工程师,因此dbt提供了内置和自定义测试方法。目前,dbt提供以下四种内置测试:
- unique - 验证所有值都是唯一的
- not_null - 检查缺失值
- accepted_values - 验证所有值都在指定的列表中,具有values参数
- relationships - 验证与特定表或列的连接,具有to 和field参数
为了指定在哪些列上使用具体测试方法,需要在models目录下定义一个名为model_properties.yaml文件。
注意:model_properties.yaml不是模型运行的必要条件,它可以被命名为任何名称。但是,如果您想要创建测试来验证通过测试输入到模型中的数据,那么这个文件是必须的。
让我们看看如何使用not_null测试来检查diamonds表的cut列中缺失的值。首先,创建一个model_properties.yml文件:
$ touch models/model_properties.yml
在里面,粘贴以下内容:
version: 2
models:
- name: average_diamond_price_per_group
columns:
- name: cut
tests:
- not_null
在models下面的- name字段中,我们指定要为哪个模型定义属性。然后,我们指定列及其名称。最后,编写tests字段,在该字段下列出not_null测试。
现在,您可以在运行dbt run之前使用该测试执行数据验证。命令是dbt test:
$ dbt test
如果收到错误消息,则意味着测试失败,需要检查表中的问题并在必要时修复它。
典型DBT项目开发流程
要在项目中成功地使用dbt,可以参考使用以下推荐流程:
- 项目初始化
- 安装dbt并使用
dbt init
创建一个新项目
- 项目配置
- 为项目选择一个数据库平台
- 在项目主目录中的
profiles.yml
文件中配置数据库凭据 - 调整项目设置:修改
dbt_project.yml
,定义项目级一些配置信息(例如,版本,依赖项)。
- 模型开发
- 编写模型SQL代码: 在模型目录中为模型定义创建
.sql
文件 - 编写模型测试: 在tests目录中创建.yml文件,定义模型测试
- 增量测试: 在开发期间使用dbt测试频繁地运行测试
- 调试问题: 使用
dbt debug
进行故障排除
- 本地验证(未来我们会详细说明)
- 构建项目: 使用
dbt build
来编译和测试模型。 - 运行彻底测试: 使用
dbt test
执行所有测试。
其他最佳实践:
- 版本控制: 使用Git进行协作和版本控制(这是必须的)
- 文档: 在模型和测试中编写用户可理解注释。然后使用
dbt docs generate
在web服务器中呈现模型文档 - 分析: 利用dbt的分析功能来捕捉和修复性能瓶颈
- 持续集成(CI): 将dbt与CI/CD管道集成,以实现自动化测试和部署
总结
在本教程中我们已经介绍了许多基础知识,但是正如我在开始时提到的,dbt是一个具有许多特性的大型工具。要达到在生产环境中轻松使用,可能需要一段时间学习和练习。
参考资料:dbt Tutorial: 7 Must-Know Concepts For Data Engineers | DataCamp
官方文档:About References | dbt Developer Hub (getdbt.com)