ETL example

To demonstrate how the ETL principles come together with airflow, let’s walk through a simple example that implements a data flow pipeline adhering to these principles. I’m mostly assuming that people running airflow will have Linux (I use Ubuntu), but the examples should work for Mac OSX as well with a couple of simple changes.

First, let’s set up a simple postgres database that has a little bit of data, so that the example can materialize in full and the processing becomes clear. You’ll see that the OLTP product and customer tables do not have a primary key; this is intentional to simplify how data is ingested into the target environment… You’d have to modify the OLTP database inbetween the runs of airflow DWH processing runs to get some nice dimensions in your data. This structure simulates updates over time.

Important

There are two ways to run this particular example; either by installing airflow on your host environment, which gives you an idea what is involved there, or running a docker container.

Install airflow on host system

Install airflow

Before we begin on this more elaborate example, follow the tutorial to get acquainted with the basic principles. After you start the webserver, also start the scheduler. Play around with it for while, follow the tutorial there, then get back to this tutorial to further contextualize your understanding of this platform.

Note that in order to complete this tutorial you need to install the extra postgres package as specified in the docs.

Clone example project

Go to the github project page of this documentation project, where you can download the example source code, DAGs, SQL and scripts to generate the databases and load it with data:

_images/GitHub-Mark-32px.png

Documentation Github Project

_images/GitHub-Mark-Light-32px.png

Clone this project locally somewhere.

Install postgres

Then first install postgres on your machine. For Ubuntu, this can be installed using apt:

$ sudo apt-get install postgresql
$ sudo service postgresql restart

For Mac OSX, I highly recommend the package installer. After installation, it will be running and you can restart it after a reboot using the app in the launcher. You can log in through the postgresql menu top right.

Set up database

On Linux, go to the do-this-first directory in the examples directory of the cloned github project, then run the create_everything.sh script. For Mac OSX, you probably have to open the SQL scripts separately and run them in order from the command line.

$ cd examples/do-this-first
$ ./create_everything.sh

Now, let’s create some tables and populate it with some data.

$ ./load_data.sh

Configure airflow

We need to declare two postgres connections in airflow.

Go to the connections screen in the UI (through Admin) and create a new postgres connection and call this postgres_oltp. Then specify conntype=Postgres, Schema=orders, login=oltp_read (same password) and port 5432 or whatever you’re using.

Then add another connection for Postgres, which connects to the data warehouse and call this “postgres_dwh”: conntype=Postgres, Schema=dwh, login=dwh_svc_account (same password) and port 5432.

You can check if these connections are working for you in the Ad-hoc query section of the Data Profiling menu and select the same connection string from there and doing a select on the order_info table:

SELECT * FROM order_info;

Then add a pool to airflow (also under Admin) which should be called postgres_dwh. Let’s give this a value of 10.

Finally add a Variable in the Variables section where the sql templates are stored; these are the SQL files from the example repository. Create a new variable “sql_path” and set the value to the directory.

Drop dags into airflow

In a real setup you’d use continuous integration to update DAG’s and dependencies in airflow after changes, but now we’re going to drop in the lot straight into the DAG directory for simplicity.

$ cd etl-example/dags
$ cp -R * $AIRFLOW_HOME/dags
$ mkdir $AIRFLOW_HOME/sql
$ cd etl-example/sql
$ cp *.sql $AIRFLOW_HOME/sql

Run airflow from docker

There’s a docker compose file in the main directory of the repository that does everything.

You may prefer to run the docker-compose process to become aware of issues that may pop up in the installation process. The postgres database needs some initialization and this is only applied the first time the container is initialized. This is how you start the containers the first time with the output to the console:

docker-compose -f docker-compose-LocalExecutor.yml up --abort-on-container-exit

This is how you can clear the containers, so that you can run the install again after resolving any issues:

docker-compose -f docker-compose-LocalExecutor.yml down

And this is how you’d typically run the container if everything is ready (as a daemon in the background):

docker-compose -f docker-compose-LocalExecutor.yml up -d

Configure airflow

We need to declare two postgres connections in airflow, a pool resource and one variable. The easiest way to do this is to run the init_docker_example DAG that was created. It will apply these settings that you’d normally do by hand. Activate the DAG by setting it to ‘on’.

To do this by hand:

Go to the connections screen in the UI (through Admin) and create a new postgres connection and call this postgres_oltp. Then specify conntype=Postgres, host=postgres, Schema=orders, login=oltp_read, password=oltp_read and port 5432.

Then add another connection for Postgres, which connects to the data warehouse and call this postgres_dwh: conntype=Postgres, Schema=dwh, login=dwh_svc_account, password=dwh_svc_account and port 5432.

You can check if these connections are working for you in the Ad-hoc query section of the Data Profiling menu. If the connections are not in the connection drop down, the connection is failing because of a dependency issue or typo. If they show up, select the postgres_oltp connection string and do a select on the order_info table:

SELECT * FROM order_info;

Then add a pool to airflow (also under Admin) which should be called postgres_dwh. Let’s give this a value of 10.

Finally add a Variable in the Variables section where the sql templates are stored; these are the SQL files from the example repository. Create a new variable “sql_path” and set the value to the directory.

Run it

In the airflow UI, refresh the main DAG UI and the new dags should be listed:

  • orders_staging
  • customer_staging
  • product_staging
  • process_dimensions
  • process_order_fact

DAGs are inserted in a non-active state, so activate the DAGS and the scheduler should start running the jobs. The process copies data from a toy OLTP data store: order_info, orderline, customer and product. Process_dimensions processes the product and customer dimensions using some Slowly Changing Dimensions with Type 2 logic and process_facts processes the fact tables.

How it works

There are two databases created (on the same server) to simulate making a connection to a remote OLTP system and another database which is a simplistic Data WareHouse. The OLTP system only has a couple of rows for orders, orderlines and some customer and product info.

The *_staging processes extract data from the OLTP database and ingest them into the staging tables in the staging schema, taking care to make this process repeatable. Repeatable means removing data for the date window of consideration first, then reinserting by issuing a select, only selecting the data that applies to the date window of interest.

The first thing you’d do when staging data is present is to process your dimensions. The process_dimensions DAG updates the customer and product dimensions in the data warehouse. Dimensions should be present before fact tables, because there are foreign keys linking facts to dimensions and you need data to be there before you can link to it.

It is set up with the depends_on_past parameter set to True, because dimensions should be updated in a specific sequence. This does have the effect that it can slow down the scheduling, because the task instances are now not parallelized.

The process_order_fact processes the order+orderline data and associates them with the correct surrogate key in the dimension tables, based on the date and time the dimension records were active and usually the business key.

Also notice how the dimension table update doesn’t delete data from a specific window. Because of existing facts and how they link together, this is very dangerous to do! Instead, running the dimension multiple times leads to no-ops later, unless some extra data was added, leading to new records. Deletion of records is not implemented in this scenario, which would lead to all versions for an entity having a specific end date.

Proof of principles compliance

If we set principles for ourselves, we need to verify that we comply with them. This section documents how the principles are implemented in the full example.

The PostgresToPostgresOperator uses a hook to acquire a connection to the source and destination database. The data corresponding to the execution date (which is here start of yesterday up to most recent midnight, but from the perspective of airflow that’s tomorrow). There’s code available in the example to work with partitioned tables at the destination, but to keep the example concise and easily runnable, I decided to comment them out. Uncomment them and adjust the operators to put this back. The principle Partition ingested data is not demonstrated by default for that reason; see the comment below for more information about the practice.

Satisfied principles (not listed are not applicable):

  • Load data incrementally : extracts only the newly created orders of the day before, not the whole table.
  • Process historic data : it’s possible to rerun the extract processes, but downstream DAGs have to be started manually.
  • Enforce the idempotency constraint : every DAG cleans out data if required and possible. Rerunning the same DAG multiple times has no undesirable side effects like duplication of the data.
  • Rest data between tasks : The data is in persistent storage before and after the operator.
  • Pool your resources : All task instances in the DAG use a pooled connection to the DWH by specifying the pool parameter.
  • Manage login details in one place : Connection settings are maintained in the Admin menu.
  • Develop your own workflow framework : A subdirectory in the DAG code repository contains a framework of operators that are reused between DAGs.
  • Sense when to start a task : The processing of dimensions and facts have external task sensors which wait until all processing of external DAGs have finished up to the required day.
  • Specify configuration details once : The place where SQL templates are is configured as an Airflow Variable and looked up as a global parameter when the DAG is instantiated.

Important

The commented code shows how to use the package manager to keep the last 90 days in a partition and then move partitions out to the master table as a retention strategy. Partition management is done through another scheduled function that runs daily and moves partitions around and creates new ones when required. What’s not demonstrated is archiving, which happens after that and depends on the accepted archiving policy for your organization.

The benefit of partitioning is that rerunning ingests is very easy and there’s better parallellization of tasks in the DB engine. So ingest jobs get less in the way of each other. The downside is that there are many more tables and files to manage and this can slow down performance if too heavily used. So it’s good for the largest of tables like orderline and invoiceline, but other tables should probably deal with a single master table.

You do not want to reload data older than 90 days in that case, so another operator or function should be added that checks whether today-execution_date is greather than 90 and prohibits execution if that’s the case. Not doing that would truncate a non-existing table. An alternative is to follow a different path in the DAG that uses DELETE FROM on the master table instead.

Issues

  • There is currently an issue with max_active_runs, which only respects the setting in the first run. When backfill is run or tasks get cleared to be rerun, the setting is not respected:

    https://issues.apache.org/jira/browse/AIRFLOW-137

  • What is not demonstrated is a better strategy to process a large backfill if the desired regular schedule is 1 day. 2 years of data leads to 700+ days and thus 700+ runs. This will eventually consume a lot of time, because the scheduler is run with a particular interval, jobs need to start, etc. Usually source systems can handle larger date windows at week or month level. More about that in the other examples.

  • When pooling is active, scheduling takes a lot more time. Even when the pool is 10 and the number of instances 7, it takes longer for the instances to actually run