What is Apache Airflow & how did I dive into it? is an open-source tool that allows you to design, plan, and monitor complex workflows. To put it simply, this tool allows you to launch a variety of tasks on schedule. Nowadays, Airflow is used widely in the (and you often can see Airflow as a required skill in job descriptions 🙂) Apache Airflow data industry The Airflow UI looks like this I discovered Airflow on my first job in IT, I was a Machine Learning engineer intern at the bank. To complete my tasks successfully and contribute to the development of the data platform, I needed to learn how to use Apache Airflow. It was more complicated, like discovering Python - it was less information and explanation of key concepts. By the way, agrees with me 😂. Here is the quote: ChatGPT “Apache Airflow can be difficult to master due to its complexity, steep learning curve, lack of documentation, requirement for programming knowledge, and challenges with debugging issues in a distributed system.” In this article, I will share my experience with Airflow, give you some tips and tricks, explain key concepts, and give a list of useful resources. is the basic entity of the Airflow. Official Apache Airflow documentation defines that structure as “a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAG structure (tasks and their dependencies) as code”. DAG and its important features\DAG (Directed Acyclic Graph) To simplify, it is a schema of all tasks and dependencies between them, which can be run by Airflow Here is an example of the DAG code on Python default_args = { 'owner': 'surname_name', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=15), 'email': 'firstname.secondname@domain.com', 'email_on_failure': False, } dag_id = 'dag_example' with DAG( dag_id=dag_id, default_args=default_args, tags=['project_name', 'surname'], schedule_interval='0 12 ***', catchup=True, max_active_runs=1 ) as dag: # tasks list next The definitions and detailed explanations of all parameters are given , I want to highlight some important tips, that were discovered during my experience with Airflow. here Do not change the value of dag_id If you change this parameter, a new dag entity will appear in Airflow UI, although the code file remains the same. I struggled with that a lot. So, I recommend being careful and attentive. How to schedule the interval of DAG runs. The schedule_interval parameter specifies the start time in cron notation. For convenience, it is better to set the interval on this site https://crontab.guru/ Airflow also has ready presets, like @hourly, @daily Value None also exists. It is used when DAG shouldn't be triggered by a specific time value. More scheduling information can be discovered here Catchup = True + start_date = datetime(year, month, day) means to execute all past DAG starts that would have been scheduled if the DAG had been created and started at the date specified in the start_date parameter value. Catchup = True In this case, the dag will be triggered for each interval, starting from and ending with the current moment. Let's imagine that you have a DAG with start_date = (2015, 1, 1) with scheduling_interval = @daily. Can you imagine how many launches will happen? DAG will be run for (2015, 1, 1), then for (2015, 1, 2), and so on. start_date By default, , so it’s vital to pay attention to this parameter. catchup = True How to beautify your DAG Above all, DAG has a parameter, which allows you to write DAG docs, and it’ll appear above the dag. It supports markdown and links to external content. A simple and efficient example can be found doc_md here Tags parameter Tags allow you to find dags in the Airflow UI easily. Put your name/surname or project name for easy navigation. tags=['project_name', 'surname'] # code snipped In UI, it looks like this. Information about tags in the airflow docs I am not a big fan of writing docs, but spending several minutes could save tons of nerve cells in the future 😅 Tasks and their statuses A task is one work unit, a DAG node. Within a task, it is defined what code will be executed and what data will be passed to the following tasks. Typical task code looks like this. # Task 1: Bash operator - execute bash command task1 = BashOperator( task_id='task1', bash_command='echo -e "def \nfoo "’ dag=dag ) # Task 2: Python operator - executes Python code task2 = PythonOperator( task_id='task2', # some_python_function - python code to execute python_callable=some_python_function, ) # task order looks like this. Read more on task orders # https://docs.astronomer.io/learn/managing-dependencies task1 >> task2 Some of the task types will be reviewed later Task statuses In Airflow, tasks have different statuses. Here is the list of the most common ones the task has not yet been queued up no_status: waiting for the finishing of previous tasks in the dag scheduled: waiting queued: the task is in progress running: the task is finished successfully success: the task is finished with an error failed: the task was skipped skipped: parent tasks have finished with an error and the task cannot be started upstream_failed: All statuses are listed in the docs Trigger rule The trigger rule determines at what state of the parent task the execution of the next task starts. The easiest option is that all previous tasks have been completed successfully. “all success” is the default value of the parameter. In Python, that value looks like this. task1 = BashOperator( task_id='task1', bash_command='echo -e "def \nfoo "’, trigger_rule='all_success', dag=dag ) But there are several more options. Here are the most popular ones: - the task will be started if the parent tasks end with an error All-failed - the task will be started if the parent tasks have finished (success, error, and task skip are allowed) All-done - the task will start as soon as one of the tasks is completed successfully One-success More about trigger rules (with clear explanations and pictures) can be found here Airflow Operators We’ve discussed tasks in Airflow. Operators, in general - like preset for execution of a different task. For example, bash command, python code, SQL query, and many more. Here is my classification of operators (Empty Operator, Dummy Operator) Decorative (Python Operator, Bash Operator) Universal (For example, Postgre Operator, MsSql Operator) For databases (For example, SparkSubmit Operator, KuberPod Operator) Specific Decorative The name “decorative” refers to Empty and Dummy operators. I used empty operators for testing - for example if the Airflow works correctly. Dummy operators, according to the , are used for defining the beginning and end of the group of tasks. docs The code for both of them is pretty simple. task_1 = EmptyOperator(task_id="task_1", dag=dag) task_2 = DummyOperator(task_id='task_2', dag=dag) Universal operators I named them universal because they are used widely and suitable for a variety of tasks. It's a Bash Operator and Python Operator. Python Operator launches Python code, whereas Bash Operator launches Bash script or command. Can you imagine how many things can be done with Python or bash? Code examples can be found above in the task entity explanation Operators for work with databases These types of operators are used to interact with different databases. Code example for PostgresOperatorI(used for execution of queries in PostgreSQL database) from official docs create_pet_table = PostgresOperator( task_id="create_pet_table", # code for SQL query sql=""" CREATE TABLE IF NOT EXISTS pet ( pet_id SERIAL PRIMARY KEY, name VARCHAR NOT NULL, pet_type VARCHAR NOT NULL, birth_date DATE NOT NULL, OWNER VARCHAR NOT NULL); """, # database connection id (possible to add in the Airflow UI) postgres_conn_id="postgres_default") Based on my experience, I don’t recommend using this operator for executing “heavy operations.” It could slow DAG drastically. Specific operators They are used to execute specific tasks. In my work, I had experience with KubernetesPodOperator ( ) (greetings for the DevOps team for answering tons of questions ) and SparkSubmit Operator. SparkSubmit Operator is used for launching Spark applications - convenient for working with large amounts of data. The best thing about it is that you can transfer spark settings as arguments. docs Code example with spark-submit operator listed below spark_task = SparkSubmitOperator( task_id='spark_job', application='/path/to/your/spark/job.py', conn_id='spark_default', # name of the Spark connection executor_memory='4g', num_executors=2, name='airflow-spark-example', verbose=False, dag=dag, ) Sensors A sensor is a type of operator that monitors at a certain time interval and checks if a criterion is met or not. If yes, it completes successfully, if no, it retries until time runs out. Here are several types of sensors waits for the specified date and time to pass. Useful when you need to execute tasks from the same DAG at different times DateTimeSensor waits for an Airflow task to be completed. Useful when you need to implement dependencies between DAGs in the same Airflow environment ExternalTaskSensor waits for the called Python object to return True PythonSensor waits for data to appear in a SQL table. SqlSensor waits for API availability. HttpSensor On my job, I used an SQL sensor to check whether the data had been delivered. When the newest data is uploaded, the ML model starts scoring. Simple sensor code example waiting_task = FileSensor( task_id='waiting_task', poke_interval=120, timeout=60 * 30 * 30, mode='reschedule' ) Explanation of the key parameters - how many seconds the sensor checks the condition (by default - 30 sec) Poke_interval - during what time the sensor works (it is highly desirable to specify) Timeout Mode parameters have two values: poke and reschedule - task occupies a slot during operation (task in active status) poke (default) - occupies the working slot only during the check itself and sleeps during the remaining times. Between sensor checks, the task status becomes up to reschedul). Recommended for use with a large time interval reschedule If you want to learn more… If you want to understand Airflow better, I highly recommend reading . The book allows you to dive deeply into Airflow concepts. There are still many interesting things to learn; this article just covered basic entities. Data Pipelines with Apache Airflow by Bas Harenslak and Julian de Ruiter I hope learning Apache Airflow has become much easier with this article. An understanding of giving concepts is enough to start writing DAGs or understanding how DAGs work.