Enable javascript in your browser for better experience. Need to know to enable it? Go here.

A brief introduction to Airflow's scheduling mechanism (part one)

In streaming media, vast volumes of data are generated every minute. This data is a precious resource for analysis; it therefore needs to be carefully processed. Data engineers typically extract the data from the source, transform it into the appropriate format, then load it into a particular location so it can be analyzed and used. This process is called ETL (Extract, Transform, Load). Like a product produced from a pipeline, a data artifact should similarly be processed in a certain order and at a certain time — that’s why it’s referred to as a “data pipeline.” What’s particularly important to consider is the order of the tasks — this has implications for the product that finally emerges at the end of the process.

 

Ensuring the pipeline works efficiently and consistently can be challenging, but fortunately there are various tools to help us schedule these tasks. In this blog post, I’m going to discuss the scheduling mechanism of Airflow.

 

 

Scheduling mechanism

 

A scheduling mechanism orchestrates tasks. It ensures they begin at a planned time and occur in a specific sequence. It also allocates the resources required for execution and handles running errors.

 

When we have lots of tasks scheduled, we normally want them to be executed in the order of task dependencies while workers work on them in parallel. 

 

For the Airflow scheduler, timing — when to execute a task — is everything. For an overview of the concepts and principles of Airflow, I recommend exploring the project documentation. For the rest of this post, I’ll be exploring how it works in a little more detail and demonstrating a number of ways it can be used to help you effectively schedule and manage data pipelines.

 

 

Timekeeper

 

Timekeeper isn’t an actual component of Airflow; it’s more specifically a combination of a set of features. I’ve put them together to make it easier to describe how they work.

 

I’ve divided it in four parts:

 

  1. Cron expression

  2. Setting parameters around DAG (directed acyclic graph) runs

  3. Important concepts

  4. Configurations about time

     

 

 

1. Cron expression

 

 

Airflow uses the cron expressions or the timedelta object for scheduling. Cron expression is a good way to define when the DAG needs to be triggered. I'll give some practical examples of how to use it.

 

# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │                                   7 is also Sunday on some systems)
# │ │ │ │ │ ┌───────────── year (1970–2099) (NOT required)
# │ │ │ │ │ │
# │ │ │ │ │ │
#  *   *   *    *   *   * <command to execute>

 

When scheduling a DAG with cron expression, it is required to assign the expression to the parameter schedule_interval of the DAG() function on initialization. For example,

 

# initialize the DAG
dag = DAG(schedule_interval='10 * * * *', ...)

 

In this example, the DAG is scheduled to be triggered at the 10th minute of every hour. Users can make some more complex scheduling rules using the cron expression.

 

 

Advanced usage

 

  • runs at every nth interval

  • */20 * * * * runs at every 20th minute of every hour. (00:00, 00:20, 00:40, 01:00, 01:20, ...)

  • runs at multiple specific time intervals

  • 0 8 1,3,10 * * runs at 8 o'clock on the 1st, 3rd, and 10th day of every month. (Jan-01 08:00, Jan-03 08:00, Jan-10 08:00, Feb-01 08:00, ...)

  • runs at a specific range of time interval

  • 0 8 1-15 * * runs at 8 o'clock of the day from 1st to 15th of every month. (Jan-01 08:00 ~ Jan-15 08:00, Feb-01 08:00, ...)

  • runs with an extra field 'year'

  • 0 0 1 1 * 2017/2 runs at midnight on  Jan 1 once every other year, starting from 2017. (2017-01-01 00:00:00, 2019-01-01 00:00:00)

  • online cron expression generator - help to make and test the expression

 

Now we can discuss how to use it to schedule the DAG runs below.

 

 

2. Setting parameters around DAG runs

 

A DAG (directed acyclic graph) run represents the execution of a given workflow. When you define a DAG in Python, you create an object of the DAG class. A DAG run is an instantiated DAG object when it's triggered.

 

Usually, we define a DAG as follows:

 

# initialize the DAG
dag = DAG (
    dag_id='demo_dag',
    start_date=datetime(2022, 1, 1, 0, 0, 0),
    end_date=datetime(2022, 12, 31, 23, 59, 59),
    schedule_interval='@daily',
    default_args=DEFAULT_ARGS,
    catchup=False
)

 

The parameters start_date, end_date, schedule_interval and catchup are related to time. I'll explain them one by one.

 

The start_date is the date at which your DAG starts being scheduled. Notice that it's not the date your DAG gets triggered, it's the date when you want to begin counting the date interval. Here I’ve used the first day of 2022 as an example.

 

Next, if I want it to stop being scheduled at the end of 2022, the end_date should be set to '2022-12-31 23:59:59'.

 

  • end_date -- A date beyond which your DAG won't run, leave to None for open ended scheduling 

 

The schedule_interval defines the time span at which your DAG gets triggered. It’s represented either by a CRON expression or timedelta object.

 

And finally, catchup has a boolean value that decides whether the DAG should backfill past DAG runs. (I'll discuss this later.)

 

In part two, we’ll discuss some important rules and concepts of DAG and configurations of time.

Disclaimer: The statements and opinions expressed in this article are those of the author(s) and do not necessarily reflect the positions of Thoughtworks.

Keep up to date with our latest insights