Du code, du communisme

Travail distribué en Python SANS Celery

[three_fourth_last]Quand on veut effectuer une même tâche un très grand nombre de fois, on peut utiliser une boucle comme un gros bourrin et effectuer la tâche dedans.
Mais il faut alors gérer:

  • le cas où le processus s’interrompt et sa reprise;
  • le parallélisme.

Le premier cas n’est guère que du travail en plus, et pas insurmontable. Le deuxième cas est plus compliqué: les threads sont inefficaces en Python pour tout ce qui n’est pas limité par l’IO et l’alternative d’utiliser des processus séparés est énormément de boulot.
Du coup généralement on ne le fait pas, et tout est traité en une fois, alors qu’on pourrait aller 4 fois plus vite avec 4 traitements en parallèle.

Kombu et Celery

Il existe des outils très puissants pour faire des queues en Python, notamment la library Celery, qui utilise elle-même la bibliothèque Kombu. Mais quand on veut juste faire un petit job rapide, les mettre en oeuvre, c’est pas mal de temps. Celery brille pour mettre dans une queue plein de tâches issues de processus externes concurrents, mais rapidement mettre en place un traitement de masse, ce n’est pas son point fort.
C’est pour ici qu’intervient un article de David Cramer, de l’équipe de chez Disqus. Ces gens sont doués, utilisent massivement Django et Python, et chez eux les bases de données ont des milliards d’entrées. Bref, ils ont plein de choses intéressantes à dire.
En l’occurrence, ils ont créé ce petit outil pour leurs migrations: le task master.

Capture d'écran de la sortie de taskmaster
Et en prime, on a un bel écran de progression

Utilisation du task master

Imaginez : vous avez besoin de transcoder des images, renommer des fichiers ou mettre à jour toutes les entrées de votre base de données. Ça va prendre 3 heures, et ça risque de planter. Ce cas de figure est exactement celui dans lequel brille le task master.
On installe ça (exemple sous Ubuntu) et avec pip:

sudo apt-get install libzmq-dev libevent-dev python-dev
pip install taskmaster

Puis on créé un fichier tasks.py:

def get_jobs(last=0):
    for valeurs_traiter in liste_de_valeurs:
        yield valeurs_traiter
def handle_job(valeurs_traiter):
    print valeurs_traiter

Les fonctions seront autodétectées si elles portent ce nom.
handle_job est la fonction qui va faire votre tâche. Par exemple mettre à jour un objet de votre base de données, encoder une vidéo, etc.
Attention, en cas de reprise des tâches, la première peut être exécutée deux fois. Il faut donc que votre fonction gère ce cas.
get_jobs doit retourner un itérable dont chaque élément sera passé à handle_job. Par exemple, yield retournera le nom d’une vidéo, ou d’un fichier.
Ensuite on lance le contrôleur:

tm-master tasks.py

On crée autant de workers que l’on souhaite, ici 4:

tm-spawn tasks.py 4

Dans ce cas, 4 processus vont vider l’itérable de get_jobs de la liste de valeurs à traiter, et appliquer handle_jobs dessus. Si vous arrêtez le controller ou les workers, vous pouvez les relancer plus tard : ils reprendront là où ils se sont arrêtés.
Si vous avez un processeur avec quatre cœurs, le travail est effectué presque 4 fois plus vite qu’avec un script tout simple et une boucle for. On peut aussi installer les workers sur des machines séparées, et les faire communiquer avec le controlleur en leur donnant son adresse IP et un port, distribuant ainsi le travail sur de nombreux ordinateurs.
Et en prime, contrairement à celery, toutes les tâches ne sont pas listées en mémoire, mais seulement une partie (que l’on peut définir en paramètre).