Il y a 11 ans -
Temps de lecture 8 minutes
Les dessous de Cascalog, Cascading
On peut critiquer la verbosité de l’API MapReduce, mais cette problématique n’est pas tant liée à la syntaxe du langage hôte qu’au manque d’abstraction. Dans un précédent article, vous avez pu voir Cascalog au travers d’une approche hands-on et remarquer qu’il est possible d’exprimer succinctement des traitements de données complexes. Si vous avez creusé un peu plus, vous avez dû vous retrouver face à Cascading, la brique servant d’intermédiaire entre Cascalog et l’API MapReduce. Dans cet article, je vous propose de voir le positionnement de Cascading dans l’écosystème Hadoop et ce qu’elle apporte.
Cascading, pourquoi?
MapReduce est l’approche que vous devez utiliser pour exprimer vos traitements sur le système de fichiers distribué fourni par Hadoop (HDFS). C’est une première abstraction qui a du sens puisqu’elle vous permet de déléguer au cluster le soin de calculer le nombre de tâches nécessaires pour traiter le volume de données que vous souhaitez attaquer, vérifier la disponibilité des nœuds, distribuer les tâches et enfin récupérer les résultats. Vous pouvez vous concentrer sur le traitement en lui-même. Pour autant, cette abstraction a un coût : vous devez penser en MapReduce. C’est un pli à prendre. Pour les cas simples, comme compter les occurrences des mots dans un ensemble de documents, le résultat est assez élégant. Mais si vous souhaitez utiliser Hadoop, c’est a priori pour réaliser des traitements plus complexes (et non pas seulement compter des mots). Et qui dit traitements complexes, dit enchainements de passes MapReduce, que cela soit en séquentiel ou en parallèle. Plus ce nombre de passes sera élevé, plus il vous sera difficile de créer, mais également de maintenir vos traitements de données.
Ce n’est certes pas une problématique nouvelle. Et c’est pour cela que des solutions comme Pig et Hive ont vu le jour. Ces deux solutions ont quelques différences mais leur point commun est de fournir un nouveau langage pour exprimer plus facilement vos requêtes. C’est un pas dans la bonne direction et il peut suffire à votre contexte. Mais l’utilisation d’un nouveau langage apporte également son lot de problèmes. Lorsque vous allez vouloir créer vos requêtes, vous allez être assis entre deux mondes. D’un coté, ce nouveau langage pour exprimer dans les grandes lignes votre requête. Et de l’autre, votre environnement (java par exemple) pour définir les opérateurs spécifiques à votre contexte, votre métier. Les traitements complexes vont demander des aller-retours entre ces deux environnements. Bref, ce n’est pas une situation agréable. De plus, vous avez l’habitude d’avoir à votre disposition des outils (héritage, composition …) qui devraient vous permettre de factoriser et créer dynamiquement vos requêtes. S’agissant d’un langage externe, ils sont peu utiles. Bien sûr, on pourrait résoudre ce problème en passant par de la concaténation de morceaux de requêtes. Cela paraît simple mais ce n’est pas si évident. C’est avec cette logique que les injections SQL sont devenues aussi communes formant ainsi la faille la plus sérieuse dans vos applications selon l’OWASP. Il faut donc mieux faire attention.
Cascading n’est pas touchée par ces problèmes car il s’agit d’une ‘simple’ surcouche au dessus de l’API MapReduce. Elle fournit en effet une nouvelle API Java permettant aux développeurs d’être plus productifs sans passer explicitement par la gymnastique du découpage d’un traitement en Map/Reduce. Certes, on perd la lisibilité d’un DSL. Mais si cela vous est nécessaire, plusieurs DSLs existent pour Cascading dans différents langages, dont Cascalog pour Clojure. Que votre choix soit de passer par un DSL ou non, comprendre le fonctionnement et l’intérêt de Cascading reste un plus.
Vos données sont des tuples
MapReduce est une API s’articulant autour de clefs et de valeurs. En tant que javaiste, votre premier réflexe peut être de créer des objets et donc des classes pour formaliser les concepts que vous allez manipuler. Cela peut être pertinent mais a souvent pour conséquence de rigidifier inutilement votre code. Vous êtes soit contraint de créer de nombreuses classes, ou alors de transmettre des informations inutiles à droite et à gauche dans votre cluster lors de l’exécution de vos requêtes. Cascading reprend une approche courante pour les langages de requêtage en modélisant vos données sous la forme d’une liste de valeurs nommées. Ainsi si votre traitement de données concerne vos utilisateurs, vous pouvez commencer par un utilisateur seulement défini par son login, puis au milieu de votre traitement par un identifiant technique et sa date de naissance puis en fin de traitement par son prénom, son nom et son âge actuel. En utilisant des tuples, vous pouvez alors manipuler seulement les informations qui sont pertinentes dans ce traitement en particulier.
Lisez et écrivez vos données grâce aux Taps
Un Tap définit où et comment vos données sont stockées et peut être à la fois une source et un collecteur (sink) de données. Par défaut, Cascading vous fournit l’essentiel pour accéder aux systèmes de fichiers (local et distribué). Et de nombreuses extensions existent vous offrant plus de choix quant à la méthode de sérialisation ou au data store : Avro, HBase, JDBC, JSON, Kryo, Memchached, Membase, ElasticSearch, MongoDB, SimpleDB, Solr, Thrift…
Voici, par exemple, comment créer une source capable de lire et écrire, ligne par ligne, des fichiers sur le système de fichier de votre installation d’Hadoop (local ou distribué). Pour chaque entrée, il créera un tuple avec un champ unique « line ».
Tap tap = new Hfs( new TextLine( new Fields( "line" ) ), path );
Utilisez Each, GroupBy, CoGroup et Every à la place de Map et Reduce
Un traitement de données se décrit comme un flux de données (Flow) reliant des Taps sources à des Taps collecteurs par des tuyaux (Pipe). Each va vous permettre d’itérer sur tous les tuples pour appliquer une fonction ou les filtrer ; GroupBy, de regrouper les tuples par la valeur de leurs champs ; CoGroup, d’effectuer des jointures et enfin Every d’appliquer un agrégateur en sortie d’un GroupBy ou CoGroup. Et bien sûr, vous pouvez les assembler et réutiliser les assemblages ainsi formés (SubAssembly) dans différents contextes.
Voici à quoi pourrait ressembler l’un de vos traitement de données.
Et en voici sa représentation écrite :
Pipe top = new Pipe( "top" ); top = new Each( top, new SomeFunction() );
Pipe bottom = new Pipe( "bottom" ); bottom = new Each( bottom, new SomeFunction() ); bottom = new Each( bottom, new SomeFilter() );
Pipe join = new CoGroup( top, bottom ); join = new Every( join, new SomeAggregator() ); join = new GroupBy( join ); join = new Every( join, new SomeAggregator() ); join = new Each( join, new SomeFunction() );
Et faites confiance à Cascading
À aucun moment, le développeur ne doit exprimer ses fonctions Maps et Reduces explicitement. Cascading est responsable de la découpe en passes MapReduce et de l’exécution du traitement en entier. Voici la découpe de l’exemple fourni ci-dessus.
La continuité des données entre les différentes passes de MapReduce est assurée par des fichiers temporaires. Optimiser les performances, tout en gardant la même logique dans votre traitement, revient à réduire le nombre de passes MapReduces afin de solliciter vos disques durs et votre réseau de façon minimale. Avec cet exemple, nous pouvons déjà voir quelques règles en action. Si je souhaite appliquer plusieurs fonctions à mes données successivement, je peux utiliser une unique phase Map utilisant une chaine de fonctions au lieu d’utiliser une chaine de phase Map utilisant chacune une seule fonction. Les jointures et autres regroupements profitent généralement de la phase Sort en amont de la phase Reduce. Et enfin, si je souhaite appliquer à la fin de mon job une dernière fonction à l’ensemble de mes données, il est en effet inutile de devoir rajouter une autre phase Map car il est tout à fait possible de le faire à la fin de la dernière phase Reduce.
Si le temps d’exécution des traitements parait aberrant, il est peut être necessaire de regarder sous le capot de Cascading. Même si vous ne trouverez pas de listes exhaustives des règles d’optimisations en dehors du code, il est possible de vérifier le résultat des optimisations en demandant à Cascading d’écrire dans un fichier le graphe des operations de votre traitement. Cela peut vous aider à détecter rapidement vos erreurs mais également à mieux comprendre les difficultés que pose votre traitement et éventuellement à reformuler celui-ci.
La suite ?
Au travers cet article, vous avez pu comprendre d’avantage le positionnement de Cascading dans l’écosystème Hadoop. L’objectif était également de démystifier un peu la magie de Cascalog et vous montrer que ce DSL reposait sur une base en Java, réalisant une partie critique du travail. Cette base peut tout à fait être utilisée en tant que telle pour construire vos propres outils puisque, contrairement à Pig ou Hive, elle ne force pas l’utilisation d’un nouveau langage spécifique. Le code et les représentations graphiques proviennent de la documentation Cascading. Vous y trouverez de nombreux détails que j’ai omis car ceux-ci étaient non vitaux pour comprendre les grands principes.
Un point qui reste ouvert cependant est bien sûr la performance de la planification en MapReduce fourni par Cascading. On peut se poser la question dans l’absolu mais également en comparaison avec Pig ou Hive. C’est un sujet complexe, des benchmarks peuvent aider mais encore faut-il que ceux-ci ne soient pas biaisés. Si c’est un sujet qui vous tient à coeur, je vous invite à vous rapprocher du Hadoop User Group France pour poser vos questions ou encore proposer une présentation sur ce sujet.
Commentaire
4 réponses pour " Les dessous de Cascalog, Cascading "
Published by Sebastien Lorber , Il y a 11 ans
Et pour ceux qui ont du mal avec Clojure comme moi on peut toujours regarder l’intégration Hadoop/Cascading de Spring en Java, ou Scalding en Scala :)
Published by Grégoire Marabout , Il y a 11 ans
Et pour ceux qui sont plus à l’aise avec le langage Ruby : cascading.jruby (https://github.com/gmarabout/cascading.jruby)
Published by Bertrand Dechoux , Il y a 11 ans
@Sebastien Lorber
Parmi les DSL pour Cascading, Scalding est en effet un que l’on ne peut pas ignorer. Son premier avantage est de fournir une API plus fluide. On peut certes essayer de faire pareil en Java (http://corner.squareup.com/2012/08/pump-a-faster-way-to-write-cascading-flows.html) mais tant que Java ne supportera pas les expressions lambda cela restera encore assez verbeux.
Un point très intéressant concernant Scalding est que l’intégration de Cascading2 est complète. Il est ainsi possible de choisir d’exécuter ses traitements localement en mémoire au lieu d’utiliser un cluster Hadoop. Cela permet entre autre d’avoir une batterie de tests d’intégrations complète et rapide.
Il est possible de faire de même en java mais le support est un peu moins transparent. J’ai justement publié un projet pour résoudre ce problème.
https://github.com/BertrandDechoux/cascading.plumber
http://conjars.org/cascading.plumber/plumber
En attendant, éventuellement, que le tout soit intégré à cascading.bind.
https://github.com/cwensel/cascading.bind
@Grégoire Marabout
Il y a effectivement un autre DSL pour ruby et il est également mentionné sur le site de Cascading. Par contre, même si je ne connais pas le projet, je conseillerais aux lecteurs de plutôt suivre le lien fourni par le site de Cascading : https://github.com/etsy/cascading.jruby
A priori, Esty maintiendrait la version la plus à jour.
Published by Grégoire Marabout , Il y a 11 ans
J’ai écrit cascading.jruby il y a qq années… Depuis, c’est Etsy qui le maintient, mais il s’agit bien du même DSL ;-)