It’s helpful to have an audit record of the data that you ingested from external systems. These external systems can already be in a file format (FTP), an HTTP/SOAP/API connection with json or xml output, or perhaps even by connecting to an external database directly.
Storing the data that you ingested this way is helpful for troubleshooting. If you ingest the data and then immediately attempt to store that in an on-premises database, you need to rely on the workflow logs to be able to figure out what happened if a problem occurred. With a record of the file that you attempted to ingest, you achieve two things:
- You can easily debug why some source data could not be ingested by a target system.
- You can easily rerun parts of your workflow that start with picking up the file from an archive location.
- You can historically reprocess all data that you picked up from days, weeks and months before without having to go back to this source system. This is very important, because source systems do not always keep this archive, may not support parametrization to do this effectively (for example, only gives you data from ‘yesterday’) and other reasons.
So, establishing that keeping a record of all ingested data is a good thing, let’s go about implementing this in airflow. FYI, I’m borrowing these ideas from a presentation given by IndustryDive, available from the Airflow links page.
Rather than hand-crafting file locations or passing files around, start with developing a predictable method for locating data files. Tasks that ingest data dump data files in those locations and tasks that read data dynamically determine these locations to read from.
The example that I devised shows the development of two types of operators. The FileToPredictableLocationOperator shows you how to develop an operator that dumps a file to a predictable location. The PredictableLocationToFinalLocationOperator is the operator that consumes from this location and processes the file to its final location.
Assuming that you still have the etl-with-airflow project cloned somewhere (see Documentation Github Project), I left a simple bash script to create some empty files in a particular sample structure so that you can see how this is supposed to work in general. Here’s how to do that:
$ cd examples $ cd file-ingest $ ./create_files.sh $ cp -R * $AIRFLOW_HOME/dags
Now simply run the airflow webserver and the airflow scheduler as before, activate the file_ingest dag and it should start processing the files you just created. Look into file_ingest.py and acme/operators/file_operators.py to see how the example works.
Extending from the example¶
The above example uses a local filesystem to reduce the dependency on more complex connections and external environments like AWS, Google Cloud or whatever storage you may have available. This is to keep the example simple. In real world scenario’s, you’d probably write a bunch of operators to operate this way. The more incremental your process is, the more
In the source code under contrib, you can see a number of example operators that move data from one system to another. The closest example is a FileToGoogleCloudStorageOperator in file_to_gcs.py. What you need to do yourself is determine for which these intermediate predictable “archive” locations apply.
Remember: An operator in airflow moves data from A to B. In that sense, it’s just an abstraction component over two (types of) hooks that need to cooperate together and achieve something in a sensible way. Operators are a great way to introduce these design choices and introduce more generic behavior at almost zero cost.
You’ll probably have a lot of people who concern themselves with moving data from A to B. Make sure that you have some solid platform engineers who know how to generalize the existing solutions to build consistency in your daily routines, otherwise you’ll find that people make arbitrary choices everywhere, which reduces the effectiveness of your overall platform.