Luigi is a library written and open sourced by Spotify. It is intended as a batch job runner. However, it contains a wide range of additional and complimentary features which make it a very attractive tool for mission critical data work flows and ETL pipelines.
The first thing to note is that Luigi is not a task scheduler. The designers recommend cron jobs to schedule tasks. This is a reasonable decision, cron is well supported by programming languages and deployment tools.
Some of the features Luigi provides are:
Firstly, lets look at how we can start using Luigi and the way it structures tasks.
We can install Luigi normally:
pip install luigi. With the library in our path we can
import luigi like any other package. The library installation will also provide a few command line executables, such as:
luigi: the primary job running command
luigid: which I assume stands for the luigi daemon, as it runs a small web app that allows you to observe your tasks as as they are being run.
luigi-deps-tree: which outputs each task name in the order they will be run. Very useful when you put together a complicated batch job!
The basic structure of a single luigi task is as follows:
import luigi class BatchJobTask(luigi.Task): date = luigi.DateParameter() def requires(self): # tasks that should be run before this one ... def output(self): # where the result of your task should go ... def run(self): # the body of your task, the logic, what it should actually do ...
With this architecture you can organise an arbitrarily complex tree of tasks to achieve any level of data processing and movement. Let’s discuss in a little more detail each part of the above task.
import luigi class BatchJobTask(luigi.Task):
All independent tasks need to inherit from
date = luigi.DateParameter()
We can send variable arguments to tasks, this is useful when you want to pass data from the command line.
These parameters are explicitly listed as class variables which Luigi then assigns to your task, permitting you to reference those values in any part of your task.
Tasks can be organised to run before others. This enables us to construct a flow that fits our context. Luigi expects a list of (or singular) task(s) to be returned from the
requires() method, such as,
return [TaskOne(), TaskTwo()]. Internally, Luigi will loop over the dependent tasks and execute them before the parent task is run.
Another important feature is the
self.input() method inherited from
self.input() allows you to reference and use the outputs from any tasks listed in the
requires() method. Therefore, data and results can be passed between tasks permitting large tasks to be broken into smaller tasks where their results.
Finally, the body of your task. Here you can either write task logic inline or reference other classes/methods that will touch your data. Within this method it is common to begin by retrieving data from
self.input() and end it by delivering the result to your
To round off this article I will highlight some of the additional features Luigi provides and why I think they may be interesting to you!
self.requires we can make a deep and robust tree of interdependent tasks that ensures our data is collected, processed and deposited in the precise order it needs to be. For instance, a typical pipeline might look like:
ReadDataFromPostgres() -> ManipulateDataTask() -> BackupResultsToFileSystem() -> SendResultsToCloudStorage()
This feature is good on its own but they become even more exciting when paired with the other features Luigi provides.
If one task fails, the following tasks will not be run. Luigi will then look back in time (how far is configured by you) and retry any failed tasks it finds in that time window. For example, perhaps we have a batch job that runs everyday. On one unfortunate day our cloud service provider decides to do maintenance on their API during the same time as our job, doh! This results in our data download failing. No matter, Luigi will stop the task there and do no more work for the day. The next day Luigi will check its backlog, notice the previous days job failed to finish and try again to run everything from that point forward. This means your data is caught up without any gaps and no ssh hackery required on your part!
If you wish to adopt Luigi whole heartedly it provides many conveniences for sending
output results to the data store or web services of your choice. Particularly useful in our case is the Postgres module which gives us ready made read/write tasks that can be dropped into your pipeline easily.
luigid was mentioned above. In your production environment
luigid should be run as a background task because Luigi uses it during runtime. Most excitingly, however, is that it also starts a small web app that provides a visual interface for monitoring your active tasks! This is a rather wonderful feature that makes for easy inspection of your long running tasks, and another incentive for creating modular work flows that are then more distinctly described on this interface giving you a very clear view of how your tasks are behaving.
Lastly, Luigi has been written with event hooks built in for any custom or premade tasks. Before, after, success, or error, hooks let us drop in notifications and extra logging wherever we need.
There you have it, Luigi, a niche library for a business critical use case. Or, I dunno, crawling the web everyday for new cat pictures and storing them in your local NAS storage? Whatever your inclination, Luigi is a great tool for the regularly run batch job domain.