Introduction
- we typically use apache airflow for organization - set the order of tasks, make sure the next task starts after the previous one is completed successfully and control the scheduling of the entire dag
- visibility - using the user interface, we can easily monitor progress of workflows, troubleshoot issues and then take action. we can also see the relationships between workflows and tasks
- flexibility and scalability - we can perform a lot of different kinds of tasks and at scale
- extensibility - we can interact with services like aws, databricks, dbt, etc by using their corresponding providers
- dynamic nature of airflow -
- dynamic tasks - generate tasks based on “dynamic” input, e.g. the number of files we receive can change, and we can generate one task per file
- dynamic workflows - generate workflows based on “static” input, e.g. we have a bunch of configuration files, and we can generate a workflow for every configuration file
- branching - execute a different set of tasks based on a condition
- components of airflow -
- web server - the ui dashboard
- scheduler - helps run tasks in the right order and at the right time. it puts the tasks into the queue
- meta database - keeps track of tasks and their statuses
- triggerer - responsible for triggering deferrable tasks i.e. tasks that wait for external events
- executor - which task should execute on which worker etc
- queue - list of tasks waiting to be executed. also takes ordering into account
- worker - process that actually performs the tasks
- workflow - it is the entire process / pipeline, which is defined using a dag
- dag - directed acyclic graph - collection of tasks organized in a way that reflects the dependencies
- remember - acyclic means the graph cannot have any cycles
- operator - a single, idempotent task in the dag. idempotent - we can run this operator however many times we want. we always get the same output for the same input
- using operators, we can break down the workflow into separate, manageable pieces of work
- task - a specific instance of an operator. it is the actual execution
- airflow is not meant for the following things -
- it is not for sub minute scheduling / realtime processing, it is used for batch workloads
- it is for orchestrating data pipelines, not to perform data processing
- airflow can be run using a “single node architecture”, where all the components are running on the same node, vs a “multi node architecture”, where we can now use solutions like postgres / mysql for the meta database, redis / rabbit mq for the queue, have multiple worker nodes, have multiple web servers running behind a load balancer, and finally maybe for components like scheduler and executor, an active passive like architecture is needed if running multiple copies of it
Installing Airflow
- download the compose file -
curl -O https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
- create the three directories -
mkdir -p ./dags ./logs ./plugins ./config
- create a .env file -
echo -e "AIRFLOW_UID=$(id -u)" > .env
- optionally, there is an environment variable
AIRFLOW__CORE__LOAD_EXAMPLES
in the compose file, which we can set to false to keep the airflow ui clean - run the db migrations -
docker compose up airflow-init
- finally, run
docker compose up -d
. both username and password is airflow to access the web server
Creating a Basic Dag
- we create a file in the dags directory for creating a dag
- the parameters include the dag id, the start date, the schedule, and catchup
- catchup is true by default, and we would generally want to set it to false, because if true, it will trigger all the missing dag runs between the current date and the the start date we specified
1
with DAG("user_processing", start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False) as dag:
- catchup also applies if a dag is paused i believe i.e. when we resume a dag, all the missing dag runs would be triggered as well
- best practice - do not put multiple steps into the same task, as if there is a failure in the later steps, the entire task would be retried. putting each step in a separate operator / task means we will only have to retry parts that fail
- there are three types of operators -
- action operator - to perform an action, e.g. bash operator to execute bash scripts or commands
- transfer operator - to transfer data from source to destination
- sensors - wait for a certain condition to be met, e.g. file sensor to wait for files to be added
- we need to create a connection by going to admin -> connections. this is what we provide in the postgres operator below for the connection id parameter
- understand that the clause
if not exists
is what makes this idempotent1 2 3 4 5 6 7 8 9 10 11 12 13 14
create_table = PostgresOperator( task_id='create_table', postgres_conn_id='postgres', sql=""" create table if not exists user ( first_name text not null, last_name text not null, country text not null, username text not null, password text not null, email text not null ); """ )
- testing out tasks in airflow -
- enter the scheduler terminal -
docker container exec -it airflow-instance-airflow-scheduler-1 /bin/bash
- now, we should be able to access airflow -
airflow --help
- finally, we can run a specific task as follows - we need to provide the dag id, task id and the execution date -
airflow tasks test user_processing create_table 2022-01-01
- enter the scheduler terminal -
- sensors have two important parameters -
- poke interval - how frequently to perform the check. defaults to 60s
- timeout - when the sensor should time out and fail. defaults to 7 days i believe
1 2 3 4 5
is_api_available = HttpSensor( task_id="is_api_available", http_conn_id="users_api", endpoint="api/" )
- assume a task called
process_user
writes a csv from the dictionary, which comes from another task calledextract_user
, which makes an api call process_user
needs access to the output ofextract_user
. it does so usingti.xcom_pull
. we achieve this using “xcom”- xcom stands for “cross communication” and is used to exchange small amounts of data between tasks. xcom is stored inside the meta database as well. use other techniques like a file system, object storage, etc for large amounts of data
- the http operator takes care of pushing the data for us below. if we were using the python operator, we could have either returned the value from the callable, or used
ti.xcom_push(key='foo', value='bar')
- we can already see in the code snippet below how to extract it from xcom. we could pull using a specific key as well -
ti.xcom_pull(task_ids="foo", key="bar")
. i think the key isreturn_value
by default - finally, we can also view the data transferred using xcom in admin -> xcoms
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
def _process_user(ti): response = ti.xcom_pull(task_ids="extract_user") dict_user = response["results"][0] user = { "name": f"{dict_user["name"]["title"]} {dict_user["name"]["first"]} {dict_user["name"]["last"]}", "username": dict_user["login"]["username"], "password": dict_user["login"]["password"], "email": dict_user["email"] } with open("/tmp/processed_user.csv", "w") as file: csv_writer = csv.DictWriter(file, fieldnames=user.keys()) csv_writer.writerow(user) extract_user = HttpOperator( task_id="extract_user", http_conn_id="users_api", endpoint="api/", method="GET", response_filter=lambda response: json.loads(response.text), log_response=True ) process_user = PythonOperator( task_id="process_user", python_callable=_process_user )
- we need to tell airflow about this dependency i.e.
process_user
needs to be triggered afterextract_user
completes successfully1
extract_user >> process_user
- note - instead of using the bit shift operators above, we could also have used
set_upstream
andset_downstream
functions - to be able to interact with external services easily, we use operators. operators use hooks underneath. this helps abstract away the complexities. however, sometimes the operators are not enough for us since we want access to some internals. in such cases, we can interact with hooks directly. notice how we are instead of using a postgres operator, using a python operator below, and then instantiating a postgres hook inside the callable
1 2 3 4 5 6 7 8 9 10 11 12 13
def _store_user(ti): postgres_hook = PostgresHook(postgres_conn_id="postgres") postgres_hook.copy_expert( sql="copy users from stdin with delimiter as ','", filename="/tmp/processed_user.csv" ) store_user = PythonOperator( task_id="store_user", python_callable=_store_user )
- we saw how catchup applies to date ranges that the dag runs were missed for - between pause date and current date / between start date and current date. to run the dag for date ranges before the start date, we can use
airflow dags backfill
- for all dag runs that were triggered due to the schedule (and not manually), there are two fields - “data interval start” and “data interval end”. e.g. if the schedule is daily, the values for the two will look like this
- dag run 1 - 01-01-2022 and 02-01-2022
- dag run 2 - 02-01-2022 and 03-01-2022
- and so on…
Datasets
- “trigger dag run operator” - from a dag, trigger another dag
- “external task sensor” - wait for a task in another dag to finish
- “dataset” - logical grouping of data, like a file, sql table, etc
- for creating a dataset, we need to define the “uri”. it can be paths to s3, files, etc. note - only ascii characters are supported
- optionally, we can also attach metadata when creating a dataset, e.g. the owner of the dataset etc
1 2
file = Dataset("file.csv") file = Dataset("file.csv", extra={"owner": "james"})
- so, we create the producer dag as follows. my understanding - notice the
outlets
parameter. this is what helps airflow know that a particular task would be writing to a dataset, and maybe this in turn helps airflow know which downstream dags to trigger?1 2 3 4 5 6 7 8 9 10
file_dataset = Dataset("/tmp/file.txt") with DAG("producer", schedule="@daily", start_date=datetime(2023, 1, 1), catchup=False): @task(outlets=[file_dataset]) def produce_data(): with open(file_dataset.uri, "a+") as file: file.write(datetime.now()) produce_data()
- in the previous section, we saw
@daily
for the schedule parameter. it is called a cron preset i believe. we can also use actual cron expressions,datetime.timedelta
object, timetable and now, dataset. again, my understanding is that specifying the dataset in the schedule parameter is what helps airflow trigger this dag when the dataset gets updated1 2 3 4 5 6 7 8 9 10
file_dataset = Dataset("/tmp/file.txt") with DAG("consumer", schedule=[file_dataset], start_date=datetime(2023, 1, 1), catchup=False): @task def consume_data(): with open(file_dataset.uri, "r") as file: print(file.read()) consume_data()
- after writing the producer and the consumer dags, the “dependency graph” in the “datasets” tab will show us the following graph -
- we can specify multiple datasets in the schedule parameter of the consumer dag. this means that the dag would only consume the data once both datasets receive updates
- final note to self - this feature does not have care about the data - it will not care about what new records, files, etc were added, and call the consumer with this new data
Database and Executors
- recall that the “executor” decides which system to run tasks on etc. there are different types of executors -
- there are different types of executors -
- “sequential executor” - run one task at a time on a single machine. so no two concurrency / inefficient? useful for testing. meta database used is sqlite
- “local executor” - run multiple tasks on a single machine, so better. this time, either of mysql, postgres, etc need to be used for the meta database
- remote executors like “celery executor” and “k8s executor” to run tasks across multiple machines. this helps scale our pipelines, as we simply need to add a worker to the cluster if we need more resources
- steps to see the executor being used -
- enter the scheduler -
docker container exec -it airflow-instance-airflow-scheduler-1 /bin/bash
- see the executor in the config file -
cat airflow.cfg | grep -i "executor ="
. it showsSequentialExecutor
- however, check the environment variables -
env | grep -i executor
, i seeAIRFLOW__CORE__EXECUTOR=CeleryExecutor
. all airflow related variables start withAIRFLOW__
. basically, the environment variable value overrides the value in the config file. this environment variable is set via the compose file
- enter the scheduler -
- we can also access flower, the celery cluster ui. to do this, run the compose file using
docker compose --profile=flower up -d
. it is accessible at 5555. we ca see the different workers in our celery cluster, active tasks and their statuses, etc - for adding another worker, just copy the block for the worker in the compose file under a different key
- i believe airflow also allows us to have different queues and different types of workers, so for instance, tasks that require running high cpu can be routed to a specific queue and its corresponding specific worker(s) and so on
- to do this on our local, if we see the command used by the different workers, we see
celery worker
. we need to change it to for e.g.celery worker -q high_cpu
. this should show up in the flower ui - the worker with no arguments is listening on the default queue, while the other one is listening on the high cpu queue - the “operators” also accept a queue parameter, and this is where we can specify the queue name and thus route the task to the right worker instance(s)
1 2 3 4 5
process_user = PythonOperator( task_id="process_user", queue="high_cpu", python_callable=_process_user )
- for concurrency, look at variables like
AIRFLOW__CORE__PARALLELISM
(number of tasks that can run per scheduler that we have. so, this multiplied by the number of schedulers gives us the total active tasks possible),AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG
(self explanatory),AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG
(self explanatory), etc - also, in the python code itself, we can define parameters like -
max_active_runs
- (dag level) max active dag runsmax_active_tasks
- (dag level) max active task instancesmax_active_tis_per_dag
- (task level) max task instances per task across all dag runs for a dag. difference from above - it applies to a particular taskmax_active_tis_per_dagrun
- (task level) max task instances per task per dag run
Task Groups
- “task groups” - make our dags easier to read and maintain by splitting it into various smaller groups of tasks. assume we have the following i.e. three download tasks done in parallel, then a combined processing step and finally three parallel transformation steps
1
[download_a, download_b, download_c] >> processing >> [transform_a, transform_b, transform_c]
- we would like to make the downloading and transforming into their own dags. e.g. to do this for downloading -
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
def get_downloads_task_group(): with TaskGroup("downloads_task_group") as group: download_a = BashOperator( task_id="download_a", bash_command="downloading file a; sleep 3;" ) download_b = BashOperator( task_id="download_b", bash_command="downloading file b; sleep 3;" ) download_c = BashOperator( task_id="download_c", bash_command="downloading file c; sleep 3;" ) return group
- the main dag just has to import and use it. note - i placed the above function inside the file taskgroups_example/downloads_task_group.py, similarly for the transform variant. so, i imported it in the main dag as follows
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from taskgroups_example.downloads_task_group import get_downloads_task_group from taskgroups_example.transforms_task_group import get_transforms_task_group # ... downloads_task_group = get_downloads_task_group() processing = BashOperator( task_id="processing", bash_command="processing file; sleep 3;" ) transforms_task_group = get_transforms_task_group() downloads_task_group >> processing >> transforms_task_group
- and for this import to work, i had to place an empty
__init__.py
file inside taskgroups_example as well - now, the entire thing can be for e.g. collapsed like an accordion in the dag
Branching
- we can use the “branch python operator” for branching
- notice how we define the dependencies now - all tasks that the branch task can return have an incoming edge from it
1 2 3 4 5 6 7 8 9
with DAG("branching", schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False) as dag: t1 = BashOperator(task_id="t1", bash_command="sleep 3") branch_task = BranchPythonOperator(task_id="branch_task", python_callable=lambda: f"t{random.randint(2, 3)}") t2 = BashOperator(task_id="t2", bash_command="sleep 3") t3 = BashOperator(task_id="t3", bash_command="sleep 3") t1 >> branch_task >> [t2, t3]
- look at the dag runs, skipped tasks are in pink -
- note - since it receives a python callable, it can effectively pull values from “xcom” using “ti” to make its decisions, not shown here
- remember that all downstream tasks of skipped tasks are skipped as well
- assume that a task is dependent on 3 tasks. even if one of them is skipped, this task would be skipped as well
- this is the default behavior, i.e. all the upstream tasks should succeed. it is called
all_success
- however, we can modify this behavior using “trigger rules”
- e.g. assume we have a set of download tasks. we can trigger an alerting pipeline even if one of them fail using
one_failed
Customization
- we can add our own “operators” / customize existing operators
- modify or add “views” to the airflow ui
- create your own “hooks” - recall that hooks help in abstracting away the complexity of interacting with an external system
- we do these customizations using the airflow’s “plugin” system
- we create classes that extend the
AirflowPlugin
class - we then call it like regular python modules
- e.g. assume airflow has no support for elasticsearch. first, inside the plugins folder, create a file called hooks/elastic/elastic_hook.py as follows -
1 2 3 4 5 6 7 8 9 10 11 12 13
class ElasticHook(BaseHook): def __init__(self, connection_id, *args, **kwargs): super().__init__(*args, **kwargs) connection = self.get_connection(connection_id) self.es = Elasticsearch([f"{connection.host}:{connection.port}"]) def info(self): return self.es.info() def add_doc(self, index, doc_type, doc): return self.es.index(index=index, doc_type=doc_type, doc=doc)
- the plugin receives a connection id because this way, the clients can create their own simple http connection in airflow, and pass the connection id. the host and port are extracted from it and used to initialize the elasticsearch client. note - i used the container id directly for the host since i created the elasticsearch container on the same network as the airflow stack
- this hook then can have multiple methods used to interact with elasticsearch, which the clients can use
- then, we need to register the above. we do it as follows in the same file at the end -
1 2 3
class AirflowElasticPlugin(AirflowPlugin): name = 'AirflowElasticPlugin' hooks = [ElasticHook]
- if we enter the scheduler container and run
airflow plugins
now, we see the following - - finally, our dag can now use this hook easily as follows -
1 2 3 4 5 6 7 8 9 10 11
from hooks.elastic.elastic_hook import ElasticHook def _es_info(): elastic_hook = ElasticHook(connection_id='elasticsearch') print(elastic_hook.info()) with DAG("custom_hook", schedule="@daily", catchup=False, start_date=datetime(2023, 1, 1)) as dag: es_info = PythonOperator( task_id="es_info", python_callable=_es_info )
Docker Operator
- normally, we would need to have all the right dependencies and compatible versions with each other on all the worker nodes, so that our tasks can use them. this can result in “dependency hell”
- also, we would have to learn the apis of the different operators, which may not be intuitive
- so, we can use the “docker operator” instead
- additionally, we can easily test our task by spinning up the container locally as well
- now, our worker needs to interact with / make calls to the docker daemon
- first, i mounted the docker.sock file to the volumes section in
x-airflow-common
. this section gets used in all airflow related containers, if i install airflow using these instructions - ```yml- /var/run/docker.sock:/opt/airflow/docker.sock ```
- for some reason, the docker operator was unable to pull the image on my machine. i just pulled it manually using
docker image pull python:3.9.17-slim-buster
for now - the final dag looks like this - notice how we need to provide the path to the socket file using
docker_url
-1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
@dag(start_date=datetime(2022, 1, 1), schedule="@daily", catchup=False) def docker_operator_example(): @task() def _t1(): pass t2 = DockerOperator( task_id="t2", image="python:3.9.17-slim-buster", command="python3 --version", docker_url="unix://var/run/docker.sock" ) _t1() >> t2 dag = docker_operator_example()
- all arguments like cpu, memory, volume mounts, network mode, etc that we can expect to pass to a normal docker container, can be passed here as well -
XCom
- now, we can also extract output from the docker container and put it into the “xcom” using
retrieve_output
andretrieve_output_path
- lets assume we build an image called docker-operator-example to achieve this - notice how we use the “pickle” library to achieve this
1 2 3 4
import pickle with open("/tmp/app.out", "wb") as file: pickle.dump({ "result": 93 }, file)
- dockerfile -
1 2 3 4 5
FROM python:3.9.17-slim-buster COPY app.py app.py CMD [ "sh", "-c", "python app.py" ]
- and finally, the dag
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
from airflow.decorators import dag, task from datetime import datetime from airflow.providers.docker.operators.docker import DockerOperator @dag(start_date=datetime(2022, 1, 1), schedule="@daily", catchup=False) def docker_operator_example(): @task() def _t1(): pass t2 = DockerOperator( task_id="t2", image="docker-operator-example", docker_url="unix://opt/airflow/docker.sock", retrieve_output=True, retrieve_output_path="/tmp/app.out" ) _t1() >> t2 dag = docker_operator_example()
- the xcom ui now -
Why Use Kubernetes Executor
- “celery executor” allows us to easily add workers as needed
- issues -
- extra infrastructure like a queue for queueing up the tasks, celery related infrastructure like flower for monitoring, etc
- we need to ensure that the worker is bootstrapped with the right set of dependencies
- wasting resources - the workers sit idle in absence of tasks
- using “kubernetes executor”, we run one task in one pod. this way, even if a crash happens, only that pod is impacted, and it can be restarted
- this also allows us more fine grained control in terms of resources and memory at the task level
- the dags can be added to the pod in one fo three ways -
- init containers
- volumes
- baked into the image itself
Templating and Macros
- assume we would like to extract data from directories, which is named using dates
- shortcoming of the solution below - we cannot for e.g. rerun it for a specific date if we want to
1 2 3 4 5 6 7 8 9 10 11
def process(extraction_date): print(f"executing for {extraction_date}") with DAG("template_example", start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False) as dag: t1 = PythonOperator( task_id="t1", python_callable=process, op_kwargs={ "extraction_date": date.today() } )
- “templates” - allow us to inject “dag run” and “task instance” metadata into tasks
1 2 3 4 5 6 7 8 9 10 11 12
t1 = PythonOperator( task_id="t1", python_callable=process, op_kwargs={ "extraction_date": "" } ) t2 = BashOperator( task_id="t2", bash_command="echo executing for " )
- not all arguments in operators support templating. the documentation will mention things like “templated” for arguments that do
- we can look at the code too - every operator mentions whether or not they can be templated
- “template fields” - which operator parameters can use templated values
- “template ext” - which file extensions can use templated values
1 2 3 4 5
class BashOperator(BaseOperator): template_fields = ('bash_command', 'env') template_ext = ('.sh', '.bash') # ...
- so, we can make an argument that is not templatable, templatable, by sub classing and then overriding it -
1 2 3 4 5
class CustomBashOperator(BashOperator): template_fields = ('bash_command', 'env', 'my_other_parameter') template_ext = ('.sh', '.bash') # ...
- the ui shows the rendered templates too, available in the rendered template section of the task details screen -
- now, using “macros”, we can modify the output of these “variables” as well -
1
(( macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') ))
Variables
- “variables” - allow us to store and retrieve data at runtime without exposing them inside the code
- we can create a variable using a “key”, “value” and optionally, a “description” of what the variable is about. go to admin -> variables in the ui
- we can also set airflow variables via the environment. my understanding - unlike above, they are not stored in the meta database. remember to set it on all the worker nodes
1
AIRFLOW_VAR_MY_VAR='{"key": "value"}'
- setting it via environment variables can also improve performance (discussed few points later)
- note - for handling secrets, variables in airflow can also integrate with “secret backends” like aws secrets manager, aws ssm parameter store, google cloud secrets manager, and hashicorp vault
- we can fetch the variable anywhere in our code as follows -
1 2 3
from airflow.models import Variable # ... my_var = Variable.get("my_var", default_var="default_value")
- if the value is a json, we can pass
deserialize_json
as true toVariable.get
. this way, the value returned is converted to a python dictionary i believe - order of resolution for variables in airflow -
- secrets backend
- environment variable - this option does not involve a network request
- meta database
- now, two of the above options involve network requests. this can cause performance problems
- additionally, the dags are parsed every 30s by airflow (this is what helps reflect our changes?). this means if we use
Variable.get
outside a task, it might cause further issues with performance. this is why we can cache the variables. note - this caches variables only when parsing dags1 2
AIRFLOW__SECRETS__USE_CACHE=True AIRFLOW__SECRETS__CACHE_TTL_SECONDS=900
- now, we can also use it in templating as follows -
1
(( var.json.my_var.my_key ))
- to hide values of variables from ui or logs, we use
hide_sensitive_var_conn_fields
(it is set to true by default) - this masks all airflow variables containing names like secret etc (full list here)
- we can however add to this list using
sensitive_var_conn_names
Dag Parameters
- TODO - add if not discussed elsewhere in other courses