Airbyte
- why use airbyte - do not have to reinvent the wheel to ingest data between multiple kinds of “sources” and “destinations”
- it gives a unified view of our integrations to help us debug issues, monitoring, etc
- airbyte is open source, so there are a lot of resources, connectors, etc
- two scenarios for using airbyte -
- “extract” and “load” part of elt. we extract and load data from google sheets into for e.g. snowflake
- “publish” - we can publish transformed data from snowflake into downstream saas applications used for reporting
- “connector” - it pulls data from the “source” / pushes it into the “sink”
- “stream” - group of related records, like records in a table
- “field” - specific attribute in a “stream”, like a column in a table
- “connection” - the entire pipeline from the source to the destination. we can define settings like “replication frequency”, whether we want to perform “full overwrites” or “incremental updates”, etc
- airbyte is not for “streaming” workloads. it ingests data in “batches”, so there might be some latency
- airbyte supports writing our own connectors as well
- find difference between airbyte tiers here
- “self managed community” - open source
- “self managed enterprise” - run airbyte yourself but with support and enterprise features
- “airbyte cloud” - fully managed
- self managed community only supports only one “workspace” unlike the other two. workspace - grouping of users, sources, destination, etc. helps share resources etc across workspaces as well i believe
- also, self managed community only supports 1 user
- i believe we will be responsible for handling upgrades - of airbyte instance itself, of the versions of different source and destination connectors, etc
- running airbyte using docker -
1 2 3
git clone --depth=1 https://github.com/airbytehq/airbyte.git cd airbyte bash run-ab-platform.sh
- go to localhost:8000, and enter username as “airbyte” and password as “password”
- under “settings”, we can see (and upgrade) the version of “source connectors” and “destination connectors”
- “webhook notifications” are supported for failures in syncs etc
- when creating a “source”, we can for e.g. choose between “airbyte connectors” and “marketplace” (community maintained?).
- ps - on office laptops, i was getting errors like these -
1 2 3 4
Caused by: io.temporal.failure.ApplicationFailure: message='Could not find image: airbyte/destination-snowflake:3.11.4', type='io.airbyte.workers.exception.WorkerException', nonRetryable=false
- the solution to such errors is to just run
docker image pull
manually - first, we configure the “source” and “destination” separately. then, we can configure the “connection”
- in the connection, we can configure the following -
- “schedule type” - manual / schedule / cron
- “replication frequency” / “cron expression” appear if we select schedule / cron above
- “destination namespace” - this is where the data would be synced
- “source defined” - use the schema defined by the source
- “destination specified” - e.g. the default schema selected when creating the destination
- “custom format” - sync all streams to a unique new schema. i believe this is useful when we are syncing from multiple sources into the same destination
- optionally, we can select the “streams” we would like to sync
- for each stream, we can select the “sync mode”
- finally, for each stream, we can also select the columns to replicate
- inside the connection, there are tabs like “status” (status of each stream), “job history” (historic runs of jobs), etc
- after a successful sync, we see the following additional columns inside every table that gets synced, and additional tables inside the schema / “namespace” airbyte_internal
- the airbyte_internal namespace has a table for each stream that we try to sync. this table has a row per record that was tried to sync. however, all the data for the record is inside a column called
_airbyte_data
- we have different “jobs” in airbyte like “verify” for verifying the connection, “schema” for checking / comparing schemas, “sync” to perform the actual synchronization, etc. think of jobs like workflows, which can have multiple tasks
- when running the sync job manually, i see three containers spin up - “discovery”, which i guess detects schema changes, and for both reading from source and writing to destination
- if i use postgres as a source, i see the following options for “sync mode” -
“full refresh overwrite” - “full refresh” - read everything, “overwrite” - delete everything and then rewrite everything in the destination “full refresh append” - “full refresh” - read everything, “append” - append everything to the data in the destination table, no overwrites. assume a sync workflow is run, a record is deleted, and a sync is run again. if we have x records in the first sync, we will have 2x-1 records after the second sync “incremental append” - “incremental” - the “cursor” value is used to track whether a record should be replicated, so we need to define a “cursor field” as well when selecting incremental. airbyte only checks for records whose cursor field is greater than the largest value it saw for it during the last sync, and loads only them. understand - this way, we are basically telling airbyte which subset of rows it needs to look at, and it can ignore rest of the rows. e.g. a timestamp field like updated at can be a good fit for this “incremental append + deduped” - issue with above - recall how append will simply add all the records. by adding “deduped”, we need to specify the primary key field, and instead of simply adding the rows whose cursor field has changed (aka rows which have been updated), it will modify the rows in the destination where needed, else add rows. this way, we do not end up with duplicate data
- if we see, its divided into two parts - the first part describes how data is read from the source, the second part describes how data gets written to the destination
- “schema changes” - airbyte checks for any schema changes once every 24 hours or so
- so remember - we might have to hit the “refresh source schema” button manually to see the changes in the ui immediately
- schema changes can be “non breaking” or “breaking”
- “non breaking changes” - new stream, delete stream, new column, delete column, column data type changes
- airbyte allows us to choose from the following behaviors (self explanatory) -
- propagate field changes only
- propagate all field and stream changes
- approve all changes - a manual approval would be needed first. we can see it in the connection tab
- stop future syncs
- “breaking changes” - change in primary key / cursor etc. the connection will always be paused in these cases, and we would have to manually review the changes first
- “cdc” or “change data capture” - capture inserts, updates, deletes etc from one data source into another
- databases like postgres, mysql, etc have “transaction logs” which contain the inserts, updates and deletes, which airbyte can read. also called “write ahead log” etc
- for this, we need to grant the permissions to the user -
ALTER USER <user_name> REPLICATION;
. we might have to make other changes as well for this to work, depending on the type of database we use - when using cdc with incremental mode, the “cursor field” is the one managed by airbyte / cdc, and we do not need to specify a column like updated at that we manage ourselves
- note that the primary key is however needed from our end if we use incremental mode with cdc
- another difference (apart from the automatic cursor field above) between incremental + cdc vs incremental by itself - deletes cannot be processed
- other limitations like unsupported schema changes might be there when using incremental with cdc. we might have to resync the entire data from scratch in such cases
- “pyairbyte” - run airbyte using a python library. it does not need the different components / docker or kubernetes that airbyte needs to run
- so, for e.g. follow the steps below to use airbyte using airflow directly -
- install airbyte, airbyte-source-s3, airbyte-destination-snowflake inside airflow
- write simple python dag code to read from “source” i.e. s3 / write to “cache” i.e. snowflake (airbyte calls “destination” as “cache” apparently)
- we do this by interacting with the pyairbyte api