Home DBT
Post
Cancel

DBT

DBT Introduction

  • initial architecture was to load the transformed data into warehouses using etl
  • the recent addition of cloud data warehouses led to promotion of elt - blast the data into the warehouse, and then use warehouse compute to perform transformations on top of that
  • the different roles in a data team -
    • data engineers - build and maintain infrastructure, overall pipeline orchestration, integrations for ingesting the data, etc
    • analytics engineers - generate cleaned, transformed data for analysis
    • data analysts - work with business to understand the requirements. use dashboards, sql, etc to query the transformed data
  • dbt sits on top of the cloud warehouses like snowflake / redshift / big query / databricks. it is used for the t of elt by the analytics engineers
  • we can manage, test and document transformations from one place

Getting Started

  • create a connection. we need to specify -
    • the snowflake account url
    • the database where the models should be created
    • the warehouse to use

  • then, we can create a project. we specify
    • name of project
    • development credentials
      • our credentials, used by dbt to connect to snowflake on our behalf
      • target schema each dbt developer should use their own unique schema to be able to work simultaneously

  • we then hit test connection for dbt to test the connection, and hit next
  • can leverage git to version control the code
  • for this, we can either use managed repositories, or one of the supported git providers like github, gitlab, etc
  • during the setup, dbt tells the ips it tries connecting using, our firewall needs to allow these ips for dbt to be able to connect to snowflake

Models

  • models are like layer(s) of transformations
  • important - they are typically just sql select statements, and the corresponding ddl / dml statements are generated for us automatically bts
  • in the models directory, create a file called customers.sql with the following content -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
    with customers as (
        select * from dbt_raw.jaffle_shop.customers
    ),
     
    customer_orders as (
        select
            user_id,
            min(order_date) as first_order_date,
            max(order_date) as last_order_date,
            count(*) as number_of_orders,
        from
            dbt_raw.jaffle_shop.orders
        group by
            user_id
    )
     
    select
        customers.*,
        customer_orders.first_order_date,
        customer_orders.last_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders
    from
        customers
        left join customer_orders on customers.id = customer_orders.user_id
    
  • important note - this file is using cte (common table expressions), and can be run using snowflake as well
  • now, when we run the command dbt run, it creates a view with the name customers in the database / schema we specify when creating the project
  • we can see the corresponding ddl for a particular model in the logs
  • use the preview tab in the ide to see the actual data in table format
  • we see that a view has been created inside the target schema we had specified in the development credentials
  • we can configure the way our model is materialized using the following snippet at the top of the sql file for the model -
    1
    2
    3
    4
    5
    
    ((
      config(
        materialized='view'
      )
    ))
    
  • when we delete models, it does not delete them from snowflake. so, we might have to delete them manually

Modularity

  • breaking things down into separate models. it allows us to for e.g. reuse the smaller models in multiple combined models
  • models/staging/stg_customers.sql -
    1
    2
    3
    4
    
    select
        *
    from
        dbt_raw.jaffle_shop.customers
    
  • similarly, we have staging models for orders and payments
  • ref macro - allows us to reference staging models in our actual models
  • dbt can infer the order to build these models in by creating a dag based on these macros
  • models/marts/dim_customers.sql -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
    with customers as (
        select * from (( ref('stg_customers') ))
    ),
      
    order_analytics as (
        select
            user_id customer_id,
            min(order_date) first_order_date,
            max(order_date) last_order_date,
            count(*) number_of_orders,
        from
            (( ref('stg_orders') ))
        group by
            customer_id
    ),
      
    select
        customers.*,
        order_analytics.first_order_date,
        order_analytics.last_order_date,
        coalesce(order_analytics.number_of_orders, 0) as number_of_orders
    from
        customers
        left join order_analytics on customers.id = order_analytics.customer_id
    
  • now, we might want our staging models to be materialized as views and our facts / dimensions to be materialized as tables
  • we can configure this globally in dbt_project.yml instead of doing it in every file like we saw here as follows -
    1
    2
    3
    4
    5
    6
    7
    8
    
    name: 'jaffle_shop'
      
    models:
      jaffle_shop:
        marts:
          +materialized: table
        staging:
          +materialized: view
    
  • output for our materialization -
  • when we go to the compile tab, we see the jinja being replaced with the actual table name
    1
    2
    3
    4
    
    with stg_customers as (
        select * from dbt_analytics.dbt_sagarwal.stg_customers
    ),
    -- ......
    
  • understand how it automatically adds the schema name based on our development credentials. so, the code automatically works for different developers
  • remember - the compile tab does not contain the actual ddl / dml, it only contains the transformed jinja for instance. the actual ddl / dml comes up in the logs
  • we can also look at the lineage tab, which shows us the dag (directed acyclic graph). it tells us the order the dbt models should be built in
  • the graph is interactive - double click on the node to open the corresponding model file automatically

Sources

  • sources - helps describe data loaded by extract phase by data engineers
  • some advantages of using sources over referencing table names directly -
    • helps track lineage better when we use source function instead of table names directly
    • helps test assumptions on source data
    • calculate freshness of source data
  • create a file called sources.yml under the models directory
  • put the following content inside it -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    version: 2
      
    sources:
      - name: jaffle_shop
        database: dbt_raw
        schema: jaffle_shop
        tables:
          - name: customers
          - name: orders
    
  • finally, change the stg_customers.sql / stg_customer_orders.sql as follows -
    1
    2
    3
    4
    
    select
        *
    from
        (( source('jaffle_shop', 'customers') ))
    
  • the graph starts showing the sources in the lineage as well -
  • freshness - we can add freshness to our sources table as follows -

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    - name: orders
      loaded_at_field: _etl_loaded_at
      freshness:
        warn_after:
          count: 6
          period: hour
        error_after:
          count: 12
          period: hour
    
  • we run the command dbt source freshness to check the freshness of our data, and we will automatically get a warning or error based on our configuration
  • e.g. a warning is raised if the greatest value in _etl_loaded_at is more than 6 but less 12 hours
  • note - the freshness and loaded_at_field configuration can also be added at each source level i.e. alongside database / schema to apply to all tables

Tests

  • tests in analytics engineering are our about making assertions on data
  • we write tests along with our development to validate our models
  • these tests are then run these in production
  • we use the command dbt test to run all of our tests
  • generic tests - applied across different models
  • generic tests are of four types - unique, not null, accepted values, relationships
  • unique - every value in a column is unique
  • not null - no value in a column is null
  • accepted values - every value in a column exists in a predefined list
  • relationships - every value in a column exists in the column of another table to maintain referential integrity
  • we can also use packages / write our own custom generic tests using jinja and macros
  • e.g. create a file called models/staging/staging.yml -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
    version: 2
     
    models:
      - name: stg_customers
        columns:
          - name: id
            tests:
              - unique
              - not_null
    
      - name: stg_orders
        columns:
          - name: user_id
            tests:
              - relationships:
                  field: id
                  to: ref('stg_customers')
    
  • the logs on running dbt test show us what the actual sql for the test looks like. if any rows satisfy the condition, our test fails. e.g. not_null_stg_customers_id uses the following sql -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    select
        count(*) as failures,
        count(*) != 0 as should_warn,
        count(*) != 0 as should_error
    from
        (
            select id
            from dbt_analytics.dbt_sagarwal.stg_customers
            where id is null
        ) dbt_internal_test
    
  • singular tests - use custom sql scripts, which is not otherwise possible using the generic tests that dbt ships with
  • in the tests directory, create a file tests/assert_stg_payment_amt_is_positive.sql with the following content (the tests directory is configured in dbt_project.yml) -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    select
        order_id,
        sum(amount) total_amount
    from
        (( ref('stg_payment') ))
    group by
        order_id
    having
        total_amount < 0
    
  • we can also apply tests to our sources directly. it is done in the same way as models - using custom sql scripts in the tests directory for singular tests or using the yml file for generic tests
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    version: 2
      
    sources:
      - name: jaffle_shop
        database: dbt_raw
        schema: jaffle_shop
        tables:
          - name: orders
            columns:
            - name: status
              tests:
              - accepted_values:
                  values: [returned, completed, return_pending, shipped, placed]
    
  • dbt build - how it works
    • run dbt test on sources
    • run dbt run and then dbt test on 1st layer of our models
    • run dbt run and then dbt test on 2nd layer of our models
    • and so on…
  • this way, our downstream models are never even built if the upstream models fail tests
  • understand how doing this manually using dbt run and dbt test would require configuring multiple commands, and would be very cumbersome
  • the skip tab will show us the models which were for e.g. skipped due to failing tests
  • it supports all the flags that test, run, snapshot and seed support. when we pass a flag, it is passed down to all the commands that support it. e.g. full refresh will be applied to both models and seeds

Documentation

  • we maintain documentation along with code to make maintaining documentation easier
  • by using macros like source and ref, our dags are generated
  • along with this, we can document things like ‘what does this model mean’, ‘what does this field mean’, etc
  • we can add descriptions to our models / statuses as so. we continue doing this in the same file we added tests to i.e. models/staging/staging.yml -

    1
    2
    3
    4
    5
    6
    
    models:
      - name: stg_customers
        description: one unique customer per row
        columns:
          - name: id
            description: primary key
    
  • sometimes, we might want more extensive documentation, which is not possible using just yaml. we can do so using doc blocks
  • create a file models/staging/documentation.md with the following content -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    (% docs order_status %)
      
    status can take one of the following values
      
    One of the following values: 
      
    | status         | definition                                       |
    |----------------|--------------------------------------------------|
    | placed         | Order placed, not yet shipped                    |
    | shipped        | Order has been shipped, not yet been delivered   |
    | completed      | Order has been received by customers             |
    | return pending | Customer indicated they want to return this item |
    | returned       | Item has been returned                           |
      
    (% enddocs %)
    
  • then, reference it inside staging.yml as following -

    1
    2
    
    - name: status
      description: "(( doc('order_status') ))"
    
  • just like we documented models, we can also document our sources
  • to generate the documentation, we use dbt docs generate
  • by looking at our dag / lineage graph, we can guess the concurrent threads we can have at a time

Deployment

  • by default, a development environment is already created for us
  • go to deploy -> environments and create a new environment
  • environment type can be development or deployment
  • we select deployment here, since development is already there, and only deployment environment is allowed
  • then, we select a deployment type - typically production
  • just like we saw in getting started, we need to set the connection, wherein we specify snowflake account url, database, warehouse
  • we enter deployment credentials instead of development credentials like in getting started
    • the right way would be to create and use a service user’s credentials instead of our own credentials here
    • also, the models now are created in a dedicated production schema which only this service user has access to, not a user specific target schema
  • after configuring all this, we can create our environment
  • then, we can create a job of type deploy job for this environment
  • here, we can enter a schedule for this job
  • we can also configure the set of commands that we would want to run on that cadence
  • by default, dbt build is present, and we are allowed to check options for running freshness checks on sources and generating docs
  • after configuring all this, we can create our job
  • we can trigger this job manually by hitting run now whenever we want, apart from relying on the schedule to trigger it
  • dbt cloud easily lets us access our documentation, shows us a breakdown of how much time the models took to build, etc

Setup Folder Structure

  • by maturity
    • two folders are created - staging and marts
    • staging - has models that represent the source data itself. perform some basic cleaning. prefixed with stg_
    • marts - all the business logic is here. combines staging models. prefixed with fact_ or dim_
    • optionally, we can have an intermediate folder inside marts for maintaining models between staging and marts i.e. models which are not used by / exposed to consumers directly. can be prefixed with int_
  • by domain
    • imagine we have three teams - finance, sales and marketing
    • we create three folders representing each of the teams in this case

DBT Explorer Features

  • feels like a superset of features available in documentation
  • models tab - shows us the status of models - success, error, skipped. it will also show us the last executed date of the model. we can click on a model to inspect its performance, the sql code for it, its lineage, etc
  • sources tab - freshness of sources etc
  • tests tab - status of test (pass or fail), what model and column it ran on, etc
  • performance tab - which models took the longest time to run, which models were executed the most, which models fail the most
  • recommendations - gives suggestions around models with missing documentation, models with missing test coverage, documenting primary keys (unique and not null), etc. it also categorizes them as high, medium, etc
  • in the lineage graph, we can type the select syntax -
    • stg_customers - highlights the stg_customers model
    • +stg_customers - highlights stg_customers along with all of its upstream dependencies
    • stg_customers+ - highlights stg_customers along with all of its downstream dependencies
  • after entering our select statement, we hit the update graph button / hit enter key to show only nodes related to our query
  • union - we can extract dags for all models combined by separating using a space -
    +stg_customers+ +stg_orders+
  • intersection - we can extract the shared upstream between all models by separating using a comma - +stg_customers+,+stg_orders+
  • to get only the sources of a model, we can use +dim_customers,resource_type:source
  • unlike the documentation, dbt explorer also supports column level lineage. this way, we can for e.g. easily view the affected downstream models when we for e.g. change the column name
  • we can also see the lineage of our projects. we can see the parent and child project lineage, how many models a project has, the public models exposed by the projects, etc
  • explorer supports advanced searching - e.g. if we just type order_id in the search bar, it will automatically give us all the models, sources, etc with that column
  • the search also supports a lot of filtering - e.g. we can filter out only tests, sources, etc. if we select models, we can also filter by modelling layer (staging, marts, intermediate), materialization type (table, view, incremental)

DBT Mesh

  • a technique for breaking down large dbt projects into multiple projects. some advantages -
    • helps collaborate efficiently - e.g. avoid conflicts due to analytics engineers working on different features
    • reduce costs by for e.g. building small parts at a time instead of the entire stack

Model Contracts

  • if someone makes breaking changes to an upstream model, it will break the downstream models and in turn the reporting
  • we can avoid this using model contracts
  • using model contracts, we can ensure - column names, data type of each column, etc
    • we need to add set contract.enforced to true as shown below
    • then, we can list the column names and their data types
    • finally, we can list the constraints like not null
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    models:
      - name: trilogy
          
        config:
          contract:
            enforced: true
          
        columns:
          - name: number_in_trilogy
            data_type: float
            constraints:
              - type: not_null
    
  • note - this is specific to the data warehouse we use, e.g. different data warehouses will have different data types

Versioning

  • sometimes, breaking changes are unavoidable
  • we can collaborate changes in model contracts easily using versioning
  • we can now test pre release versions in production - basically old one is left untouched, and a new version is created
  • we can offer a “migration window” for downstream users before making the change official (e.g. changing the latest_version property to the latest version)
  • create a new file, say fct_orders_v2.sql with the new updated sql
    • we rename order_total as order_amount
    • we change data type of location_opened_at to date
  • now, we modify our corresponding configuration for model file as follows

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
    models:
      - name: fct_orders
          
        config:
          contract:
            enforced: true
          
        columns:
          - name: order_total
            data_type: number
          - name: location_opened_at
            data_type: timestamp_ntz
          
        versions:
          - v: 1
          - v: 2
            columns:
              - include: all
                except: [order_total]
              - name: order_amount
                data_type: number
              - name: location_opened_at
                data_type: date
    
  • note - we do not enter anything for v1 because our actual definition is for v1. so, we enter our changes in v2
  • by default, these will be materialized under fct_orders_v1 and fct_orders_v2. to ensure that for e.g. v1 is materialized under fct_orders itself, we can do the following -

    1
    2
    3
    
    - v: 1
      config:
        alias: fct_orders
    
  • downstream users can reference a specific version using (( ref('fct_orders', v=1) ))
  • if downstream users do not specify a version i.e. they use (( ref('fct_orders') )), the latest version will be used by default. to change this behavior, we can configure what version should be used by default using latest_version

    1
    2
    3
    
    models:
      - name: fct_orders
        latest_version: 1
    
  • now, this becomes a pre release of sorts. once we are sure that our changes are stable, we can remove this line / change its value to 2
  • when we run dbt run --select fct_orders, all versions are run
  • to run a specific version, use dbt run --select fct_orders_v2
  • to run the latest version, use dbt run --select dim_customers,version:latest

Model Group and Model Access Modifiers

  • model groups - set of models related to one another. e.g. owned by a specific team / business use case. e.g. employee data
  • each model group will have an owner
  • e.g. groups.yml file -

    1
    2
    3
    4
    5
    6
    7
    
    groups:
      - name: finance
        owner:
          name: Firstname Lastname
          email: finance@jaffleshop.com
          slack: finance-data
          github: finance-data-team
    
  • each model can only belong to one model group
  • model access modifiers - how models outside a group can access (reference) models in a group
  • public - can be accessed by models in any group or project. e.g. staging models can be used across different projects to help with collaboration
  • protected - can be accessed by models in same group or project. this is the default
  • private - can only be accessed by models in same group
  • we can edit the configuration for our models to specify the group they belong to and their access modifier as follows -

    1
    2
    3
    4
    
    models:
      - name: finance_model
        access: private
        group: finance
    

Cross Project Reference

  • check 1 - ensure that models in the upstream project are public
  • check 2 - there should be at least one successful run in the production environment before we can start importing across projects
  • create a file called dependencies.yml

    1
    2
    
    projects:
      - name: core_platform
    
  • then, we can reference the models in these projects using the cross project reference macro, which looks a lot like the usual reference macro
    1
    
    (( ref('project_name', 'model_name') ))
    
  • explorer can show us how different downstream projects reference a public model
  • random thought - this feature is only available in enterprise dbt cloud. if using dbt core, the workaround would be to think of models in upstream project as source in the downstream project. this way, i guess we loose out on capabilities like versioning, lineage across projects, etc

Cross Project Orchestration

  • we would want our downstream projects to build automatically after the upstream projects finish building
  • we do not have to rely on brittle time based dependency now
  • also by using dbt mesh, we are now breaking big long running jobs into multiple short jobs
  • while creating a job, instead of running it on a schedule, we can configure it to run on job completion i.e. run when another job finishes
  • here, we can select the upstream project, the upstream job, and the statuses of the upstream job on which this job should be triggered - on success, on success and on error, all statuses, etc. here, since we depend on the models being built by the upstream project, we check on success

Analyses and Seeds

  • analyses - sql files in the analyses folder in dbt project
  • they are not models or tests, so they do not get run - when we for e.g. run dbt run or dbt compile, we see the compiled output inside the target folder, but unlike models they do not get materialized
  • they support jinja, and we can see the actual sql code it compiles down to
  • use cases - store one off queries which can be run manually, e.g. creating users etc
  • seeds - csv files that live in seeds folder in dbt project
  • purpose - take a short amount of data, and manifest that in the data warehouse as a table
  • dbt seed can materialize this as a table in our data warehouse
  • we can them reference them using the same ref function we use to reference models
  • use case - static data, e.g. mapping country code to actual names of the countries
  • finally, we can add descriptions and tests to seeds just like our regular models

Jinja

  • python based templating language
  • e.g. when we use the ref function, it swaps it with the table name prefixed with the target schema based on development credentials
  • jinja is used, which is a pythonic language. everything inside curly braces is jinja, and the rest of it is regular sql. note - replaced curly with round braces, because it was not showing up otherwise
    1
    2
    3
    
    (% for i in range(5) %)
        select (( i )) as number (% if not loop.last %) union all (% endif %)
    (% endfor %)
    
  • this results in a table containing one column number, with values 0-4
  • we see two basic forms of jinja syntax above -
    • (% %) - operation is happening inside the jinja context. it is invisible to the end user after the code is compiled
    • (( )) - pulling it out of the jinja context and actually outputting it
  • we use the if condition so that union all is not added after the last select. this is what the compiled tab shows -
    1
    2
    3
    4
    5
    
    select 0 as number union all 
    select 1 as number union all 
    select 2 as number union all 
    select 3 as number union all 
    select 4 as number
    
  • we can use set to set a variable - the compiled output just has my name
    1
    2
    3
    
    (% set my_name = 'shameek' %)
    (% set my_surname = 'agarwal' %)
    (( my_name )) (( my_surname )) -- shameek agarwal
    
  • we might see a lot of blank lines in our compiled output, because the jinja percentage blocks are not part of the compiled output
  • to prevent this, we can use (%- -%) instead of (% %)
  • comment is done using (# #) i believe. note - using -- comments in sql. i was stuck on this for a while. basically the sql comment does not work for jinja i.e. if we comment using --, while our ide will show it as a comment because its an sql file, commenting jinja using it will have no effect and the jinja code would still be executed

Macros

  • macros - help us write generic logic that is then reusable across the project
  • they are actually a feature of jinja - so we can find relevant documentation in both jinja and dbt
  • in the macros directory, create a file called cents_to_dollars.sql
    1
    2
    3
    
    (% macro cents_to_dollars() %)
        amount / 100
    (% endmacro %)
    
  • now, we can use it in our model as follows -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    select
        id as payment_id,
        orderid as order_id,
        paymentmethod as payment_method,
        status,
        -- amount is stored in cents, convert it to dollars
        (( cents_to_dollars() )) as amount,
        created as created_at
    from
        (( source('stripe', 'payment') ))
    
  • if we hit compile, we can see the sql being compiled down to amount / 100 as amount, correctly
  • however, this relies on the fact that our column was named amount. we can use arguments to make the column name configurable
  • apart from that, lets say we want to allow rounding up as well, with the argument defaulting to 2
  • we change the macro to the following -
    1
    2
    3
    
    (% macro cents_to_dollars(column_name, precision=2) %)
        round((( column_name )) / 100, (( precision )))
    (% endmacro %)
    
  • now, we can use the macro as follows -
    1
    2
    
    (( cents_to_dollars('amount') )) as amount,
    (( cents_to_dollars('amount', 3) )) as precise_amount
    

    the compiled sql looks as follows -

    1
    2
    
    round(amount / 100, 2) as amount,
    round(amount / 100, 3) as precise_amount
    
  • now, worth thinking about - macros in this case might be reducing readability as it requires jumping around. it is a trade off between dry vs readability

Packages

  • packages - import models and macros into our dbt projects
  • model package example - someone has already modelled facebook adds, stripe data, etc, and we can directly start using it
  • macro package example - dbt utils. e.g. it has the date spine macro, which can create a long table of dates with whatever difference we configure between each of the dates
  • dbt hub has all the packages. we can import from here or from github directly
  • we add dependencies to packages.yml file at the root. clicking on the links on dbt hub will give us the snippet with package name and version to copy to this file -

    1
    2
    3
    
    packages:
      - package: dbt-labs/dbt_utils
        version: 1.3.0
    
  • we add a github package as follows - version changes to revision

    1
    2
    3
    
    packages:
      - package: git@github.com:dbt-labs/dbt-utils.git
        revision: master
    
  • finally, we can also use packages in local i.e. relative to our dbt_project.yml file like so -

    1
    2
    
    packages:
      - local: sub_project
    
  • we run dbt deps to clone and install all the dependencies required for our running our project
  • e.g. of using date_spine macro

    1
    2
    3
    4
    5
    6
    7
    
    ((
        dbt_utils.date_spine(
            datepart="day",
            start_date="to_date('01/01/2023', 'mm/dd/yyyy')",
            end_date="dateadd(week, 1, current_date)"
        )
    ))
    
  • we can look at the complex compiled sql result, view the actual logic in the package under macros, etc
  • another macro example - surrogate_key can accept different columns and spit out a unique hash of these columns. when we have a composite primary key, we can use this to get a unique value per row. we can place the generic test of unique on this column
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    select
      (( dbt_utils.surrogate_key(['customer_id', 'order_date']) )) as id,
      customer_id,
      order_date,
      count(*)
    from
      (( ref('stg_orders') ))
    group by
      1,2,3
    
  • packages like dbt utils work across warehouses - we could be using this in bigquery or in snowflake, and it would spit out the right sql for us automatically
  • till now, we saw how we can use macros from packages, but we can also use models from packages
  • e.g. the below package gives us models which can help us understand spending in snowflake

    1
    2
    3
    
    packages:
      - package: gitlabhq/snowflake_spend
        version: 1.3.0
    
  • setup instructions can be found here, e.g. we need to seed with effective rates (per credit price, which depends on our choice of cloud provider, region, etc)

Jinja, Macros and Packages Examples

Example 1

  • the below example helps us grant permissions to the tables
  • note - the set block helps us set contents of the block to a variable. jinja calls it block assignment i believe
  • we can execute jinja code using do
  • we can run some sql using run_query. we can find the available macros in dbt jinja functions
  • the target variable contains information about things like warehouse, database, etc in the form of a dictionary
  • these are defined / derived from the settings we specify under the (development / deployment) environment, credentials, connection, etc
  • to access its values, i entered (( target )) in a file and compiled it. the compilation output gave me this
    1
    2
    3
    4
    5
    6
    7
    8
    
    {
        "account": "tp279l2.ap-southeast-1",
        "user": "shameek",
        "database": "analytics",
        "warehouse": "transforming",
        "role": "accountadmin",
        "schema": "dbt_sagarwal",
        # .... more stuff
    
  • the schema here for e.g. is the user specific schema we specify as the target schema in development credentials
  • we use this as the default arguments for our macro
  • the other function we use is log. by setting info as true, we see the log in both summary and details, otherwise we only see it under details
  • create a file macros/grant_select.sql with the following content -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    (% macro grant_select(schema=target.schema, role=target.role) %)
    
        (% set query %)
            grant usage on schema (( schema )) to role (( role ));
            grant select on all tables in schema (( schema )) to role (( role ));
            grant select on all views in schema (( schema )) to role (( role ));
        (% endset %)
    
        (( log("granting view access on " ~ schema ~ " to role " ~ role, info=True) ))
        (% do run_query(query) %)
    
    (% endmacro %)
    
  • we run models using dbt run
  • however in this case, we want to run a specific macro and not a model. we can do this using dbt run-operation grant_select
  • we can pass in variables to the macro using args, which is like a dictionary - dbt run-operation grant_select --args '{ 'role': 'xyz' }'

Example 2

  • using the results of a query, we would like to build out our sql dynamically
  • compilation is done in two phases. execute is false the first time around, and true the second time. basically, no sql is executed the first time, but it is executed the second time. during the first pass, only building of dags etc happen
  • so, this might give us issues like null pointer exception for the second line below, because results is None -
    1
    2
    
    (% set results = run_query(query) %)
    (% set values = results.columns[0].values() %)
    
  • so, we do something like this instead -
    1
    2
    3
    4
    5
    6
    
    (% set results = run_query(query) %)
    (% if execute %)
      (% set values = results.columns[0].values() %)
    (% else %)
      (% set values = [] %)
    (% endif %)
    
  • we get back the table object from run_query, and we access the values of the right column from it to set our statuses variable
  • now, assume we have a table like this - stg_orders(order_id, customer_id, order_date, status)
  • we want a row for every customer, and a column for every status. the column will tell us how many orders with that particular status a customer has
  • we can construct the model as follows -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    (% set query %)
        select distinct status from (( ref('stg_orders') ))
    (% endset %)
      
    (% if execute %)
        (% set statuses = run_query(query).columns[0].values() %)
    (% else %)
        (% set statuses = [] %)
    (% endif %)
      
    (( log("calculated statuses = " ~ statuses, info=True) ))
      
    select
        customer_id,
        (%- for status in statuses %)
        sum(case when status = '(( status ))' then 1 else 0 end) (( status ))
        (%- if not loop.last %),(% endif -%)
        (% endfor %)
    from
        (( ref('stg_orders') ))
    group by
        customer_id
    
  • the compiled code looks as follows -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    select
        customer_id,
        sum(case when status = 'returned' then 1 else 0 end) returned,
        sum(case when status = 'completed' then 1 else 0 end) completed,
        sum(case when status = 'return_pending' then 1 else 0 end) return_pending,
        sum(case when status = 'shipped' then 1 else 0 end) shipped,
        sum(case when status = 'placed' then 1 else 0 end) placed
    from
        analytics.dbt_sagarwal.stg_orders
    group by
        customer_id
    

Unit Testing

  • test driven development - forces us to write tests before code. this forces us to think about edge cases upfront
  • regression testing - helps catch mistakes in breaking changes
  • we use unit testing during test driven development, wherein we mention the set of inputs and expected outputs for the given piece of code
  • these tests should be run as a part of continuous integration to catch any future bugs containing breaking changes
  • unit tests are run before the node is built, constraint tests are run after the node is built but before it is materialized, data tests are run after the model is materialized
  • unit tests test our complex sql logic, constraint tests check shape of data, while data tests check content of data
  • we also save on resources and money, since these tests are run before the models are materialized with millions of rows
  • we can configure unit tests using yaml. we can either specify inputs and outputs in yaml directly, or we can leverage csvs for this to keep the yaml smaller
  • in the inputs and outputs, we only need to mention the columns of interest
  • assume we remap our status column in the following way -
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    select
        id as order_id,
        user_id as customer_id,
        order_date,
        (
            case
                when status like '%pending%' then 'placed'
                when status like '%return%' then 'returned'
                else status
            end
        ) status
    from
        (( source('jaffle_shop', 'orders') ))
    
  • in the given section, we specify the data for all the input that might be used in our models, e.g. only one source is being used in this case

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    unit_tests:
      - name: stg_orders_status_cleanup
        model: stg_orders
        given:
          - input: source('jaffle_shop', 'orders')
            rows:
              - ( id: 1, status: returned )
              - ( id: 2, status: return_pending )
              - ( id: 3, status: completed )
              - ( id: 4, status: pending )
              - ( id: 5, status: shipped )
              - ( id: 6, status: placed )
        expect:
          rows:
              - ( order_id: 1, status: returned )
              - ( order_id: 2, status: returned )
              - ( order_id: 3, status: completed )
              - ( order_id: 4, status: placed )
              - ( order_id: 5, status: shipped )
              - ( order_id: 6, status: placed )
    
  • we can run just the unit tests by using just dbt test --select test_type:unit
  • otherwise, we can rely on our regular old dbt build
  • let us say our logic involves current timestamp
    1
    
    datediff('days', order_date, (( dbt.current_timestamp() ))) days_since
    
  • we cannot unit test on this easily, because current timestamp will change everyday, and hence our assertion will have to change
  • so, we override the value returned by the macro by hardcoding it instead

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    - name: stg_orders_days_since_calc
      model: stg_orders
      overrides:
        macros: 
          dbt.current_timestamp: date('2019-01-01')
      given:
        - input: source('jaffle_shop', 'orders')
          rows:
            - { order_date: 2018-01-01 }
      expect:
        rows:
          - { order_date: 2018-01-01, days_since: 365 }
    
  • just like macros, we can override variables and environment variables as well
  • a good practice might be to avoid running unit tests in production environment. unit tests only need to be run as a part of development environments / continuous integration environments. we can do this by setting the variable DBT_EXCLUDE_RESOURCE_TYPE to unit_test

Webhooks

  • webhooks vs polling - webhook is push based i.e. data is pushed to the receiving app. polling is pull based i.e. the receiving app asks for the data from the source
  • webhooks is much more immediate and less resource intensive as compared to polling
  • if using webhooks, we have two kinds of webhook - outgoing webhook to send the data, incoming webhook to receive the data
  • dbt currently only supports outgoing webhooks for when jobs are started / completed (with job status)
  • e.g. use case - create pager duty alarms when dbt jobs fail
  • go to account settings -> webhooks -> create webhook. here, enter the -
    • webhook name and description
    • events to trigger this webhook for - run started, run completed (any status), run completed (errored)
    • jobs - trigger this webhook for all jobs or some specific job(s)
    • finally, we enter the endpoint that dbt should call

Materialization

  • materialization - how dbt builds the models. remember - we just write select statements, and dbt generates the right ddl / dml on top of that for us
  • five types of materialization
  • can be configured inside dbt_project.yml to configure at a model level / inside the individual model itself using a config block at the top
  • table - actual table is built
  • views - we would be rerunning the query every time
  • ephemeral - take our select statements and use it as a cte in the downstream models. so, nothing exists in our database. hard to debug as we cannot query or inspect them directly
  • small note - if we change the materialization of a model for e.g. from table to a view, the table will be dropped and a new view would be created in its place. this is because dbt generates commands like this - create or replace. however, if we go for e.g. from a table to ephemeral, the table still stays there
  • now, we discuss incremental and snapshot

Incremental

  • we end up reprocessing years worth of data (which should not ideally change) when using materialization of type table
  • so, the idea is that we just add the new records and keep the old table as is, because historical data typically cannot / should not change
  • we save on processing time, resources and money this way
  • start with view. if querying it takes too long, switch to table. if building it takes too long, switch to incremental
    1
    
    (( config(materialized = 'incremental') ))
    
  • we have the source table, and an already built table which we want to modify
  • we use the way below to identify the new data in our source. note - this should only apply to subsequent runs. for the first run, this filtering should not happen
    1
    2
    3
    4
    5
    6
    7
    
    with events as (
      select *
      from (( source('snowplow', 'events') ))
      (% if is_incremental() %)
      where t_stamp >= (select max(t_stamp) from (( this )))
      (% endif %)
    )
    
  • notice the use of this to reference the already existing and built model. we cannot use ref(model) in the model itself, since it will become a cyclic dependency
  • the first time around, it will run the usual create or replace table. second time around though, it runs create or replace temporary table and inserts records from this temporary table into the model using insert into
  • when we add the --full-refresh flag, it will build from scratch using the create or replace table command like the first time around
  • issues - what if data showed up in our warehouse late? e.g. we always use the latest time as the cutoff, but events can appear out of order - we have already processed events up to 11.30, but in the next incremental run, we receive an event which happened at 10.30
  • so, we can change the sql to following -
    1
    2
    3
    
    (% if is_incremental() %)
    where t_stamp >= (select dateadd('day', -3, max(t_stamp)) from (( this )))
    (% endif %)
    
  • we move the cutoff to three days back
  • now, the current issue - we can end up with duplicate records. solution - unique_keys. it helps update the existing records
    1
    2
    3
    4
    5
    6
    
    ((
      config(
        materialized = 'incremental',
        unique_key = 'page_view_id'
      )
    ))
    
  • with this, the command that was doing insert into from the temporary table to the model now changes to merge into
  • note - advantage of dbt - dbt automatically used merge into, since snowflake supports it. different warehouses might have different ways of achieving this, which dbt can handle for us automatically
  • how to decide three days - set the cutoff based on the two parameters below -
    • ask for tolerance for correctness
    • perform analysis on ‘how late’ can say 99 percentile of data be
  • prioritizing correctness can negate performance gains
  • secondly, remember we can always run a --full-refresh on a weekly basis
  • another issue - assume our ‘unique key’ has records in both - current table and the 3 day current window we are processing
  • we end up overwriting the old data using our new batch
  • my understanding of one potential fix - we change our filter. instead of filtering based on time, filter in this way i.e. retrieve all records having page view id in the batch being currently processed -
    1
    2
    3
    4
    5
    6
    7
    
    (% if is_incremental() %)
    where page_view_id in (
      select distinct page_view_id
      from (( source('snowplow', 'events') ))
      where t_stamp >= (select dateadd('day', -3, max(t_stamp)) from (( this )))
    )
    (% endif %)
    
  • issue - this subquery for finding all the distinct page view ids can become very slow
  • we should avoid such fixes if we have truly huge amounts of data, as these optimizations can become very slow
  • good candidates for incremental models - immutable events, like page views on our browser
  • if they are mutable - we need fields like updated at to be able to filter easily

Snapshot

  • think slowly changing dimensions (type 2)
  • we run them using dbt snapshot, as they run separately from our models
  • we now get four new columns -
    • dbt_scd_id - a totally unique identifier
    • dbt_updated_at - a mirror of the updated_at field
    • dbt_valid_from and dbt_valid_to fields
  • we use a different target database and schema, which is different from what we use in profiles / environments. this is because we want it to be independent from our models, since they cannot be reproduced. it is source data, which is not something we otherwise have access to
  • just like incremental models, we need to specify the unique key
  • we can specify a strategy and the field for this strategy, which is timestamp and updated at in our case
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    (% snapshot snap_products %)
    ((
      config(
        target_database = 'xyz',
        target_schema = 'pqr',
        unique_key = 'id',
    
        strategy = 'timestamp',
        updated_at = 'updated_at'
      )
    ))
    select * from (( source('abc', 'def') ))
    (% endsnapshot %)
    
  • we pretend that this snapshot is actually available in the source itself, and use it like that
  • sometimes however, we might have these snapshots at the end of our reported data, to view the changes easily
  • nothing changes as such, we select from ref instead of source, thats all
This post is licensed under CC BY 4.0 by the author.
Contents

Spark Advanced

-