DBT Introduction
- initial architecture was to load the transformed data into warehouses using etl
- the recent addition of cloud data warehouses led to promotion of elt - blast the data into the warehouse, and then use warehouse compute to perform transformations on top of that
- the different roles in a data team -
- data engineers - build and maintain infrastructure, overall pipeline orchestration, integrations for ingesting the data, etc
- analytics engineers - generate cleaned, transformed data for analysis
- data analysts - work with business to understand the requirements. use dashboards, sql, etc to query the transformed data
- dbt sits on top of the cloud warehouses like snowflake / redshift / big query / databricks. it is used for the t of elt by the analytics engineers
- we can manage, test and document transformations from one place
Getting Started
- create a connection. we need to specify -
- the snowflake account url
- the database where the models should be created
- the warehouse to use
- then, we can create a project. we specify
- name of project
- development credentials
- our credentials, used by dbt to connect to snowflake on our behalf
- target schema each dbt developer should use their own unique schema to be able to work simultaneously
- we then hit test connection for dbt to test the connection, and hit next
- can leverage git to version control the code
- for this, we can either use managed repositories, or one of the supported git providers like github, gitlab, etc
- during the setup, dbt tells the ips it tries connecting using, our firewall needs to allow these ips for dbt to be able to connect to snowflake
Models
- models are like layer(s) of transformations
- important - they are typically just sql select statements, and the corresponding ddl / dml statements are generated for us automatically bts
- in the models directory, create a file called customers.sql with the following content -
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
with customers as ( select * from dbt_raw.jaffle_shop.customers ), customer_orders as ( select user_id, min(order_date) as first_order_date, max(order_date) as last_order_date, count(*) as number_of_orders, from dbt_raw.jaffle_shop.orders group by user_id ) select customers.*, customer_orders.first_order_date, customer_orders.last_order_date, coalesce(customer_orders.number_of_orders, 0) as number_of_orders from customers left join customer_orders on customers.id = customer_orders.user_id
- important note - this file is using cte (common table expressions), and can be run using snowflake as well
- now, when we run the command
dbt run
, it creates a view with the name customers in the database / schema we specify when creating the project - we can see the corresponding ddl for a particular model in the logs
- use the preview tab in the ide to see the actual data in table format
- we see that a view has been created inside the target schema we had specified in the development credentials
- we can configure the way our model is materialized using the following snippet at the top of the sql file for the model -
1 2 3 4 5
(( config( materialized='view' ) ))
- when we delete models, it does not delete them from snowflake. so, we might have to delete them manually
Modularity
- breaking things down into separate models. it allows us to for e.g. reuse the smaller models in multiple combined models
- models/staging/stg_customers.sql -
1 2 3 4
select * from dbt_raw.jaffle_shop.customers
- similarly, we have staging models for orders and payments
- ref macro - allows us to reference staging models in our actual models
- dbt can infer the order to build these models in by creating a dag based on these macros
- models/marts/dim_customers.sql -
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
with customers as ( select * from (( ref('stg_customers') )) ), order_analytics as ( select user_id customer_id, min(order_date) first_order_date, max(order_date) last_order_date, count(*) number_of_orders, from (( ref('stg_orders') )) group by customer_id ), select customers.*, order_analytics.first_order_date, order_analytics.last_order_date, coalesce(order_analytics.number_of_orders, 0) as number_of_orders from customers left join order_analytics on customers.id = order_analytics.customer_id
- now, we might want our staging models to be materialized as views and our facts / dimensions to be materialized as tables
- we can configure this globally in
dbt_project.yml
instead of doing it in every file like we saw here as follows -1 2 3 4 5 6 7 8
name: 'jaffle_shop' models: jaffle_shop: marts: +materialized: table staging: +materialized: view
- output for our materialization -
- when we go to the compile tab, we see the jinja being replaced with the actual table name
1 2 3 4
with stg_customers as ( select * from dbt_analytics.dbt_sagarwal.stg_customers ), -- ......
- understand how it automatically adds the schema name based on our development credentials. so, the code automatically works for different developers
- remember - the compile tab does not contain the actual ddl / dml, it only contains the transformed jinja for instance. the actual ddl / dml comes up in the logs
- we can also look at the lineage tab, which shows us the dag (directed acyclic graph). it tells us the order the dbt models should be built in
- the graph is interactive - double click on the node to open the corresponding model file automatically
Sources
- sources - helps describe data loaded by extract phase by data engineers
- some advantages of using sources over referencing table names directly -
- helps track lineage better when we use source function instead of table names directly
- helps test assumptions on source data
- calculate freshness of source data
- create a file called sources.yml under the models directory
- put the following content inside it -
1 2 3 4 5 6 7 8 9
version: 2 sources: - name: jaffle_shop database: dbt_raw schema: jaffle_shop tables: - name: customers - name: orders
- finally, change the stg_customers.sql / stg_customer_orders.sql as follows -
1 2 3 4
select * from (( source('jaffle_shop', 'customers') ))
- the graph starts showing the sources in the lineage as well -
freshness - we can add freshness to our sources table as follows -
1 2 3 4 5 6 7 8 9
- name: orders loaded_at_field: _etl_loaded_at freshness: warn_after: count: 6 period: hour error_after: count: 12 period: hour
- we run the command
dbt source freshness
to check the freshness of our data, and we will automatically get a warning or error based on our configuration - e.g. a warning is raised if the greatest value in
_etl_loaded_at
is more than 6 but less 12 hours - note - the
freshness
andloaded_at_field
configuration can also be added at each source level i.e. alongside database / schema to apply to all tables
Tests
- tests in analytics engineering are our about making assertions on data
- we write tests along with our development to validate our models
- these tests are then run these in production
- we use the command
dbt test
to run all of our tests - generic tests - applied across different models
- generic tests are of four types - unique, not null, accepted values, relationships
- unique - every value in a column is unique
- not null - no value in a column is null
- accepted values - every value in a column exists in a predefined list
- relationships - every value in a column exists in the column of another table to maintain referential integrity
- we can also use packages / write our own custom generic tests using jinja and macros
- e.g. create a file called models/staging/staging.yml -
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
version: 2 models: - name: stg_customers columns: - name: id tests: - unique - not_null - name: stg_orders columns: - name: user_id tests: - relationships: field: id to: ref('stg_customers')
- the logs on running dbt test show us what the actual sql for the test looks like. if any rows satisfy the condition, our test fails. e.g. not_null_stg_customers_id uses the following sql -
1 2 3 4 5 6 7 8 9 10
select count(*) as failures, count(*) != 0 as should_warn, count(*) != 0 as should_error from ( select id from dbt_analytics.dbt_sagarwal.stg_customers where id is null ) dbt_internal_test
- singular tests - use custom sql scripts, which is not otherwise possible using the generic tests that dbt ships with
- in the tests directory, create a file tests/assert_stg_payment_amt_is_positive.sql with the following content (the tests directory is configured in dbt_project.yml) -
1 2 3 4 5 6 7 8 9
select order_id, sum(amount) total_amount from (( ref('stg_payment') )) group by order_id having total_amount < 0
- we can also apply tests to our sources directly. it is done in the same way as models - using custom sql scripts in the tests directory for singular tests or using the yml file for generic tests
1 2 3 4 5 6 7 8 9 10 11 12 13
version: 2 sources: - name: jaffle_shop database: dbt_raw schema: jaffle_shop tables: - name: orders columns: - name: status tests: - accepted_values: values: [returned, completed, return_pending, shipped, placed]
dbt build
- how it works- run
dbt test
on sources - run
dbt run
and thendbt test
on 1st layer of our models - run
dbt run
and thendbt test
on 2nd layer of our models - and so on…
- run
- this way, our downstream models are never even built if the upstream models fail tests
- understand how doing this manually using
dbt run
anddbt test
would require configuring multiple commands, and would be very cumbersome - the skip tab will show us the models which were for e.g. skipped due to failing tests
- it supports all the flags that test, run, snapshot and seed support. when we pass a flag, it is passed down to all the commands that support it. e.g. full refresh will be applied to both models and seeds
Documentation
- we maintain documentation along with code to make maintaining documentation easier
- by using macros like
source
andref
, our dags are generated - along with this, we can document things like ‘what does this model mean’, ‘what does this field mean’, etc
we can add descriptions to our models / statuses as so. we continue doing this in the same file we added tests to i.e. models/staging/staging.yml -
1 2 3 4 5 6
models: - name: stg_customers description: one unique customer per row columns: - name: id description: primary key
- sometimes, we might want more extensive documentation, which is not possible using just yaml. we can do so using doc blocks
- create a file models/staging/documentation.md with the following content -
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
(% docs order_status %) status can take one of the following values One of the following values: | status | definition | |----------------|--------------------------------------------------| | placed | Order placed, not yet shipped | | shipped | Order has been shipped, not yet been delivered | | completed | Order has been received by customers | | return pending | Customer indicated they want to return this item | | returned | Item has been returned | (% enddocs %)
then, reference it inside staging.yml as following -
1 2
- name: status description: "(( doc('order_status') ))"
- just like we documented models, we can also document our sources
- to generate the documentation, we use
dbt docs generate
- by looking at our dag / lineage graph, we can guess the concurrent threads we can have at a time
Deployment
- by default, a development environment is already created for us
- go to deploy -> environments and create a new environment
- environment type can be development or deployment
- we select deployment here, since development is already there, and only deployment environment is allowed
- then, we select a deployment type - typically production
- just like we saw in getting started, we need to set the connection, wherein we specify snowflake account url, database, warehouse
- we enter deployment credentials instead of development credentials like in getting started
- the right way would be to create and use a service user’s credentials instead of our own credentials here
- also, the models now are created in a dedicated production schema which only this service user has access to, not a user specific target schema
- after configuring all this, we can create our environment
- then, we can create a job of type deploy job for this environment
- here, we can enter a schedule for this job
- we can also configure the set of commands that we would want to run on that cadence
- by default,
dbt build
is present, and we are allowed to check options for running freshness checks on sources and generating docs - after configuring all this, we can create our job
- we can trigger this job manually by hitting run now whenever we want, apart from relying on the schedule to trigger it
- dbt cloud easily lets us access our documentation, shows us a breakdown of how much time the models took to build, etc
Setup Folder Structure
- by maturity
- two folders are created - staging and marts
- staging - has models that represent the source data itself. perform some basic cleaning. prefixed with
stg_
- marts - all the business logic is here. combines staging models. prefixed with
fact_
ordim_
- optionally, we can have an intermediate folder inside marts for maintaining models between staging and marts i.e. models which are not used by / exposed to consumers directly. can be prefixed with
int_
- by domain
- imagine we have three teams - finance, sales and marketing
- we create three folders representing each of the teams in this case
DBT Explorer Features
- feels like a superset of features available in documentation
- models tab - shows us the status of models - success, error, skipped. it will also show us the last executed date of the model. we can click on a model to inspect its performance, the sql code for it, its lineage, etc
- sources tab - freshness of sources etc
- tests tab - status of test (pass or fail), what model and column it ran on, etc
- performance tab - which models took the longest time to run, which models were executed the most, which models fail the most
- recommendations - gives suggestions around models with missing documentation, models with missing test coverage, documenting primary keys (unique and not null), etc. it also categorizes them as high, medium, etc
- in the lineage graph, we can type the select syntax -
stg_customers
- highlights thestg_customers
model+stg_customers
- highlightsstg_customers
along with all of its upstream dependenciesstg_customers+
- highlightsstg_customers
along with all of its downstream dependencies
- after entering our select statement, we hit the update graph button / hit enter key to show only nodes related to our query
- union - we can extract dags for all models combined by separating using a space -
+stg_customers+ +stg_orders+
- intersection - we can extract the shared upstream between all models by separating using a comma -
+stg_customers+,+stg_orders+
- to get only the sources of a model, we can use
+dim_customers,resource_type:source
- unlike the documentation, dbt explorer also supports column level lineage. this way, we can for e.g. easily view the affected downstream models when we for e.g. change the column name
- we can also see the lineage of our projects. we can see the parent and child project lineage, how many models a project has, the public models exposed by the projects, etc
- explorer supports advanced searching - e.g. if we just type
order_id
in the search bar, it will automatically give us all the models, sources, etc with that column - the search also supports a lot of filtering - e.g. we can filter out only tests, sources, etc. if we select models, we can also filter by modelling layer (staging, marts, intermediate), materialization type (table, view, incremental)
DBT Mesh
- a technique for breaking down large dbt projects into multiple projects. some advantages -
- helps collaborate efficiently - e.g. avoid conflicts due to analytics engineers working on different features
- reduce costs by for e.g. building small parts at a time instead of the entire stack
Model Contracts
- if someone makes breaking changes to an upstream model, it will break the downstream models and in turn the reporting
- we can avoid this using model contracts
- using model contracts, we can ensure - column names, data type of each column, etc
- we need to add set
contract.enforced
to true as shown below - then, we can list the column names and their data types
- finally, we can list the constraints like not null
1 2 3 4 5 6 7 8 9 10 11 12
models: - name: trilogy config: contract: enforced: true columns: - name: number_in_trilogy data_type: float constraints: - type: not_null
- we need to add set
- note - this is specific to the data warehouse we use, e.g. different data warehouses will have different data types
Versioning
- sometimes, breaking changes are unavoidable
- we can collaborate changes in model contracts easily using versioning
- we can now test pre release versions in production - basically old one is left untouched, and a new version is created
- we can offer a “migration window” for downstream users before making the change official (e.g. changing the
latest_version
property to the latest version) - create a new file, say fct_orders_v2.sql with the new updated sql
- we rename
order_total
asorder_amount
- we change data type of
location_opened_at
to date
- we rename
now, we modify our corresponding configuration for model file as follows
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
models: - name: fct_orders config: contract: enforced: true columns: - name: order_total data_type: number - name: location_opened_at data_type: timestamp_ntz versions: - v: 1 - v: 2 columns: - include: all except: [order_total] - name: order_amount data_type: number - name: location_opened_at data_type: date
- note - we do not enter anything for v1 because our actual definition is for v1. so, we enter our changes in v2
by default, these will be materialized under
fct_orders_v1
andfct_orders_v2
. to ensure that for e.g. v1 is materialized underfct_orders
itself, we can do the following -1 2 3
- v: 1 config: alias: fct_orders
- downstream users can reference a specific version using
(( ref('fct_orders', v=1) ))
if downstream users do not specify a version i.e. they use
(( ref('fct_orders') ))
, the latest version will be used by default. to change this behavior, we can configure what version should be used by default usinglatest_version
1 2 3
models: - name: fct_orders latest_version: 1
- now, this becomes a pre release of sorts. once we are sure that our changes are stable, we can remove this line / change its value to 2
- when we run
dbt run --select fct_orders
, all versions are run - to run a specific version, use
dbt run --select fct_orders_v2
- to run the latest version, use
dbt run --select dim_customers,version:latest
Model Group and Model Access Modifiers
- model groups - set of models related to one another. e.g. owned by a specific team / business use case. e.g. employee data
- each model group will have an owner
e.g. groups.yml file -
1 2 3 4 5 6 7
groups: - name: finance owner: name: Firstname Lastname email: finance@jaffleshop.com slack: finance-data github: finance-data-team
- each model can only belong to one model group
- model access modifiers - how models outside a group can access (reference) models in a group
- public - can be accessed by models in any group or project. e.g. staging models can be used across different projects to help with collaboration
- protected - can be accessed by models in same group or project. this is the default
- private - can only be accessed by models in same group
we can edit the configuration for our models to specify the group they belong to and their access modifier as follows -
1 2 3 4
models: - name: finance_model access: private group: finance
Cross Project Reference
- check 1 - ensure that models in the upstream project are public
- check 2 - there should be at least one successful run in the production environment before we can start importing across projects
create a file called dependencies.yml
1 2
projects: - name: core_platform
- then, we can reference the models in these projects using the cross project reference macro, which looks a lot like the usual reference macro
1
(( ref('project_name', 'model_name') ))
- explorer can show us how different downstream projects reference a public model
- random thought - this feature is only available in enterprise dbt cloud. if using dbt core, the workaround would be to think of models in upstream project as source in the downstream project. this way, i guess we loose out on capabilities like versioning, lineage across projects, etc
Cross Project Orchestration
- we would want our downstream projects to build automatically after the upstream projects finish building
- we do not have to rely on brittle time based dependency now
- also by using dbt mesh, we are now breaking big long running jobs into multiple short jobs
- while creating a job, instead of running it on a schedule, we can configure it to run on job completion i.e. run when another job finishes
- here, we can select the upstream project, the upstream job, and the statuses of the upstream job on which this job should be triggered - on success, on success and on error, all statuses, etc. here, since we depend on the models being built by the upstream project, we check on success
Analyses and Seeds
- analyses - sql files in the analyses folder in dbt project
- they are not models or tests, so they do not get run - when we for e.g. run
dbt run
ordbt compile
, we see the compiled output inside the target folder, but unlike models they do not get materialized - they support jinja, and we can see the actual sql code it compiles down to
- use cases - store one off queries which can be run manually, e.g. creating users etc
- seeds - csv files that live in seeds folder in dbt project
- purpose - take a short amount of data, and manifest that in the data warehouse as a table
dbt seed
can materialize this as a table in our data warehouse- we can them reference them using the same
ref
function we use to reference models - use case - static data, e.g. mapping country code to actual names of the countries
- finally, we can add descriptions and tests to seeds just like our regular models
Jinja
- python based templating language
- e.g. when we use the
ref
function, it swaps it with the table name prefixed with the target schema based on development credentials - jinja is used, which is a pythonic language. everything inside curly braces is jinja, and the rest of it is regular sql. note - replaced curly with round braces, because it was not showing up otherwise
1 2 3
(% for i in range(5) %) select (( i )) as number (% if not loop.last %) union all (% endif %) (% endfor %)
- this results in a table containing one column number, with values 0-4
- we see two basic forms of jinja syntax above -
(% %)
- operation is happening inside the jinja context. it is invisible to the end user after the code is compiled(( ))
- pulling it out of the jinja context and actually outputting it
- we use the if condition so that union all is not added after the last select. this is what the compiled tab shows -
1 2 3 4 5
select 0 as number union all select 1 as number union all select 2 as number union all select 3 as number union all select 4 as number
- we can use set to set a variable - the compiled output just has my name
1 2 3
(% set my_name = 'shameek' %) (% set my_surname = 'agarwal' %) (( my_name )) (( my_surname )) -- shameek agarwal
- we might see a lot of blank lines in our compiled output, because the jinja percentage blocks are not part of the compiled output
- to prevent this, we can use
(%- -%)
instead of(% %)
- comment is done using
(# #)
i believe. note - using--
comments in sql. i was stuck on this for a while. basically the sql comment does not work for jinja i.e. if we comment using--
, while our ide will show it as a comment because its an sql file, commenting jinja using it will have no effect and the jinja code would still be executed
Macros
- macros - help us write generic logic that is then reusable across the project
- they are actually a feature of jinja - so we can find relevant documentation in both jinja and dbt
- in the macros directory, create a file called cents_to_dollars.sql
1 2 3
(% macro cents_to_dollars() %) amount / 100 (% endmacro %)
- now, we can use it in our model as follows -
1 2 3 4 5 6 7 8 9 10
select id as payment_id, orderid as order_id, paymentmethod as payment_method, status, -- amount is stored in cents, convert it to dollars (( cents_to_dollars() )) as amount, created as created_at from (( source('stripe', 'payment') ))
- if we hit compile, we can see the sql being compiled down to
amount / 100 as amount,
correctly - however, this relies on the fact that our column was named
amount
. we can use arguments to make the column name configurable - apart from that, lets say we want to allow rounding up as well, with the argument defaulting to 2
- we change the macro to the following -
1 2 3
(% macro cents_to_dollars(column_name, precision=2) %) round((( column_name )) / 100, (( precision ))) (% endmacro %)
- now, we can use the macro as follows -
1 2
(( cents_to_dollars('amount') )) as amount, (( cents_to_dollars('amount', 3) )) as precise_amount
the compiled sql looks as follows -
1 2
round(amount / 100, 2) as amount, round(amount / 100, 3) as precise_amount
- now, worth thinking about - macros in this case might be reducing readability as it requires jumping around. it is a trade off between dry vs readability
Packages
- packages - import models and macros into our dbt projects
- model package example - someone has already modelled facebook adds, stripe data, etc, and we can directly start using it
- macro package example - dbt utils. e.g. it has the date spine macro, which can create a long table of dates with whatever difference we configure between each of the dates
- dbt hub has all the packages. we can import from here or from github directly
we add dependencies to packages.yml file at the root. clicking on the links on dbt hub will give us the snippet with package name and version to copy to this file -
1 2 3
packages: - package: dbt-labs/dbt_utils version: 1.3.0
we add a github package as follows -
version
changes torevision
1 2 3
packages: - package: git@github.com:dbt-labs/dbt-utils.git revision: master
finally, we can also use packages in local i.e. relative to our dbt_project.yml file like so -
1 2
packages: - local: sub_project
- we run
dbt deps
to clone and install all the dependencies required for our running our project e.g. of using
date_spine
macro1 2 3 4 5 6 7
(( dbt_utils.date_spine( datepart="day", start_date="to_date('01/01/2023', 'mm/dd/yyyy')", end_date="dateadd(week, 1, current_date)" ) ))
- we can look at the complex compiled sql result, view the actual logic in the package under macros, etc
- another macro example -
surrogate_key
can accept different columns and spit out a unique hash of these columns. when we have a composite primary key, we can use this to get a unique value per row. we can place the generic test ofunique
on this column1 2 3 4 5 6 7 8 9
select (( dbt_utils.surrogate_key(['customer_id', 'order_date']) )) as id, customer_id, order_date, count(*) from (( ref('stg_orders') )) group by 1,2,3
- packages like dbt utils work across warehouses - we could be using this in bigquery or in snowflake, and it would spit out the right sql for us automatically
- till now, we saw how we can use macros from packages, but we can also use models from packages
e.g. the below package gives us models which can help us understand spending in snowflake
1 2 3
packages: - package: gitlabhq/snowflake_spend version: 1.3.0
- setup instructions can be found here, e.g. we need to seed with effective rates (per credit price, which depends on our choice of cloud provider, region, etc)
Jinja, Macros and Packages Examples
Example 1
- the below example helps us grant permissions to the tables
- note - the
set
block helps us set contents of the block to a variable. jinja calls it block assignment i believe - we can execute jinja code using
do
- we can run some sql using
run_query
. we can find the available macros in dbt jinja functions - the
target
variable contains information about things like warehouse, database, etc in the form of a dictionary - these are defined / derived from the settings we specify under the (development / deployment) environment, credentials, connection, etc
- to access its values, i entered
(( target ))
in a file and compiled it. the compilation output gave me this1 2 3 4 5 6 7 8
{ "account": "tp279l2.ap-southeast-1", "user": "shameek", "database": "analytics", "warehouse": "transforming", "role": "accountadmin", "schema": "dbt_sagarwal", # .... more stuff
- the schema here for e.g. is the user specific schema we specify as the target schema in development credentials
- we use this as the default arguments for our macro
- the other function we use is log. by setting info as true, we see the log in both summary and details, otherwise we only see it under details
- create a file macros/grant_select.sql with the following content -
1 2 3 4 5 6 7 8 9 10 11 12
(% macro grant_select(schema=target.schema, role=target.role) %) (% set query %) grant usage on schema (( schema )) to role (( role )); grant select on all tables in schema (( schema )) to role (( role )); grant select on all views in schema (( schema )) to role (( role )); (% endset %) (( log("granting view access on " ~ schema ~ " to role " ~ role, info=True) )) (% do run_query(query) %) (% endmacro %)
- we run models using
dbt run
- however in this case, we want to run a specific macro and not a model. we can do this using
dbt run-operation grant_select
- we can pass in variables to the macro using args, which is like a dictionary -
dbt run-operation grant_select --args '{ 'role': 'xyz' }'
Example 2
- using the results of a query, we would like to build out our sql dynamically
- compilation is done in two phases. execute is false the first time around, and true the second time. basically, no sql is executed the first time, but it is executed the second time. during the first pass, only building of dags etc happen
- so, this might give us issues like null pointer exception for the second line below, because results is None -
1 2
(% set results = run_query(query) %) (% set values = results.columns[0].values() %)
- so, we do something like this instead -
1 2 3 4 5 6
(% set results = run_query(query) %) (% if execute %) (% set values = results.columns[0].values() %) (% else %) (% set values = [] %) (% endif %)
- we get back the table object from
run_query
, and we access the values of the right column from it to set ourstatuses
variable - now, assume we have a table like this - stg_orders(order_id, customer_id, order_date, status)
- we want a row for every customer, and a column for every status. the column will tell us how many orders with that particular status a customer has
- we can construct the model as follows -
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
(% set query %) select distinct status from (( ref('stg_orders') )) (% endset %) (% if execute %) (% set statuses = run_query(query).columns[0].values() %) (% else %) (% set statuses = [] %) (% endif %) (( log("calculated statuses = " ~ statuses, info=True) )) select customer_id, (%- for status in statuses %) sum(case when status = '(( status ))' then 1 else 0 end) (( status )) (%- if not loop.last %),(% endif -%) (% endfor %) from (( ref('stg_orders') )) group by customer_id
- the compiled code looks as follows -
1 2 3 4 5 6 7 8 9 10 11
select customer_id, sum(case when status = 'returned' then 1 else 0 end) returned, sum(case when status = 'completed' then 1 else 0 end) completed, sum(case when status = 'return_pending' then 1 else 0 end) return_pending, sum(case when status = 'shipped' then 1 else 0 end) shipped, sum(case when status = 'placed' then 1 else 0 end) placed from analytics.dbt_sagarwal.stg_orders group by customer_id
Unit Testing
- test driven development - forces us to write tests before code. this forces us to think about edge cases upfront
- regression testing - helps catch mistakes in breaking changes
- we use unit testing during test driven development, wherein we mention the set of inputs and expected outputs for the given piece of code
- these tests should be run as a part of continuous integration to catch any future bugs containing breaking changes
- unit tests are run before the node is built, constraint tests are run after the node is built but before it is materialized, data tests are run after the model is materialized
- unit tests test our complex sql logic, constraint tests check shape of data, while data tests check content of data
- we also save on resources and money, since these tests are run before the models are materialized with millions of rows
- we can configure unit tests using yaml. we can either specify inputs and outputs in yaml directly, or we can leverage csvs for this to keep the yaml smaller
- in the inputs and outputs, we only need to mention the columns of interest
- assume we remap our status column in the following way -
1 2 3 4 5 6 7 8 9 10 11 12 13
select id as order_id, user_id as customer_id, order_date, ( case when status like '%pending%' then 'placed' when status like '%return%' then 'returned' else status end ) status from (( source('jaffle_shop', 'orders') ))
in the
given
section, we specify the data for all the input that might be used in our models, e.g. only one source is being used in this case1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
unit_tests: - name: stg_orders_status_cleanup model: stg_orders given: - input: source('jaffle_shop', 'orders') rows: - ( id: 1, status: returned ) - ( id: 2, status: return_pending ) - ( id: 3, status: completed ) - ( id: 4, status: pending ) - ( id: 5, status: shipped ) - ( id: 6, status: placed ) expect: rows: - ( order_id: 1, status: returned ) - ( order_id: 2, status: returned ) - ( order_id: 3, status: completed ) - ( order_id: 4, status: placed ) - ( order_id: 5, status: shipped ) - ( order_id: 6, status: placed )
- we can run just the unit tests by using just
dbt test --select test_type:unit
- otherwise, we can rely on our regular old
dbt build
- let us say our logic involves current timestamp
1
datediff('days', order_date, (( dbt.current_timestamp() ))) days_since
- we cannot unit test on this easily, because current timestamp will change everyday, and hence our assertion will have to change
so, we override the value returned by the macro by hardcoding it instead
1 2 3 4 5 6 7 8 9 10 11 12
- name: stg_orders_days_since_calc model: stg_orders overrides: macros: dbt.current_timestamp: date('2019-01-01') given: - input: source('jaffle_shop', 'orders') rows: - { order_date: 2018-01-01 } expect: rows: - { order_date: 2018-01-01, days_since: 365 }
- just like macros, we can override variables and environment variables as well
- a good practice might be to avoid running unit tests in production environment. unit tests only need to be run as a part of development environments / continuous integration environments. we can do this by setting the variable
DBT_EXCLUDE_RESOURCE_TYPE
tounit_test
Webhooks
- webhooks vs polling - webhook is push based i.e. data is pushed to the receiving app. polling is pull based i.e. the receiving app asks for the data from the source
- webhooks is much more immediate and less resource intensive as compared to polling
- if using webhooks, we have two kinds of webhook - outgoing webhook to send the data, incoming webhook to receive the data
- dbt currently only supports outgoing webhooks for when jobs are started / completed (with job status)
- e.g. use case - create pager duty alarms when dbt jobs fail
- go to account settings -> webhooks -> create webhook. here, enter the -
- webhook name and description
- events to trigger this webhook for - run started, run completed (any status), run completed (errored)
- jobs - trigger this webhook for all jobs or some specific job(s)
- finally, we enter the endpoint that dbt should call
Materialization
- materialization - how dbt builds the models. remember - we just write select statements, and dbt generates the right ddl / dml on top of that for us
- five types of materialization
- can be configured inside dbt_project.yml to configure at a model level / inside the individual model itself using a config block at the top
- table - actual table is built
- views - we would be rerunning the query every time
- ephemeral - take our select statements and use it as a cte in the downstream models. so, nothing exists in our database. hard to debug as we cannot query or inspect them directly
- small note - if we change the materialization of a model for e.g. from table to a view, the table will be dropped and a new view would be created in its place. this is because dbt generates commands like this -
create or replace
. however, if we go for e.g. from a table to ephemeral, the table still stays there - now, we discuss incremental and snapshot
Incremental
- we end up reprocessing years worth of data (which should not ideally change) when using materialization of type table
- so, the idea is that we just add the new records and keep the old table as is, because historical data typically cannot / should not change
- we save on processing time, resources and money this way
- start with view. if querying it takes too long, switch to table. if building it takes too long, switch to incremental
1
(( config(materialized = 'incremental') ))
- we have the source table, and an already built table which we want to modify
- we use the way below to identify the new data in our source. note - this should only apply to subsequent runs. for the first run, this filtering should not happen
1 2 3 4 5 6 7
with events as ( select * from (( source('snowplow', 'events') )) (% if is_incremental() %) where t_stamp >= (select max(t_stamp) from (( this ))) (% endif %) )
- notice the use of
this
to reference the already existing and built model. we cannot useref(model)
in the model itself, since it will become a cyclic dependency - the first time around, it will run the usual
create or replace table
. second time around though, it runscreate or replace temporary table
and inserts records from this temporary table into the model usinginsert into
- when we add the
--full-refresh
flag, it will build from scratch using thecreate or replace table
command like the first time around - issues - what if data showed up in our warehouse late? e.g. we always use the latest time as the cutoff, but events can appear out of order - we have already processed events up to 11.30, but in the next incremental run, we receive an event which happened at 10.30
- so, we can change the sql to following -
1 2 3
(% if is_incremental() %) where t_stamp >= (select dateadd('day', -3, max(t_stamp)) from (( this ))) (% endif %)
- we move the cutoff to three days back
- now, the current issue - we can end up with duplicate records. solution -
unique_keys
. it helps update the existing records1 2 3 4 5 6
(( config( materialized = 'incremental', unique_key = 'page_view_id' ) ))
- with this, the command that was doing
insert into
from the temporary table to the model now changes tomerge into
- note - advantage of dbt - dbt automatically used
merge into
, since snowflake supports it. different warehouses might have different ways of achieving this, which dbt can handle for us automatically - how to decide three days - set the cutoff based on the two parameters below -
- ask for tolerance for correctness
- perform analysis on ‘how late’ can say 99 percentile of data be
- prioritizing correctness can negate performance gains
- secondly, remember we can always run a
--full-refresh
on a weekly basis - another issue - assume our ‘unique key’ has records in both - current table and the 3 day current window we are processing
- we end up overwriting the old data using our new batch
- my understanding of one potential fix - we change our filter. instead of filtering based on time, filter in this way i.e. retrieve all records having page view id in the batch being currently processed -
1 2 3 4 5 6 7
(% if is_incremental() %) where page_view_id in ( select distinct page_view_id from (( source('snowplow', 'events') )) where t_stamp >= (select dateadd('day', -3, max(t_stamp)) from (( this ))) ) (% endif %)
- issue - this subquery for finding all the distinct page view ids can become very slow
- we should avoid such fixes if we have truly huge amounts of data, as these optimizations can become very slow
- good candidates for incremental models - immutable events, like page views on our browser
- if they are mutable - we need fields like updated at to be able to filter easily
Snapshot
- think slowly changing dimensions (type 2)
- we run them using
dbt snapshot
, as they run separately from our models - we now get four new columns -
dbt_scd_id
- a totally unique identifierdbt_updated_at
- a mirror of theupdated_at
fielddbt_valid_from
anddbt_valid_to
fields
- we use a different target database and schema, which is different from what we use in profiles / environments. this is because we want it to be independent from our models, since they cannot be reproduced. it is source data, which is not something we otherwise have access to
- just like incremental models, we need to specify the unique key
- we can specify a strategy and the field for this strategy, which is timestamp and updated at in our case
1 2 3 4 5 6 7 8 9 10 11 12 13
(% snapshot snap_products %) (( config( target_database = 'xyz', target_schema = 'pqr', unique_key = 'id', strategy = 'timestamp', updated_at = 'updated_at' ) )) select * from (( source('abc', 'def') )) (% endsnapshot %)
- we pretend that this snapshot is actually available in the source itself, and use it like that
- sometimes however, we might have these snapshots at the end of our reported data, to view the changes easily
- nothing changes as such, we select from
ref
instead ofsource
, thats all