Il y a 7 ans -
Temps de lecture 15 minutes
Découvrez SparkR, la nouvelle API de Spark
Le 11 juin dernier la version 1.4.0 de Apache Spark a été publiée. Parmi toutes les nouveautés annoncées la plus importante était la venue d’une nouvelle API venant s’ajouter aux trois déjà existantes (Scala, Java et Python) : R. R est un langage d’analyse statistique particulièrement apprécié chez les statisticiens. Nous vous présentions sur ce blog il y a quelques mois ce langage et ses utilisations en data science. Le 9 septembre dernier, la sortie de la version 1.5.0 de Spark amenait plusieurs nouveautés avec notamment l’intégration de MLlib. Découvrez maintenant comment utiliser SparkR, de la manipulation de données à l’exécution d’algorithmes de machine learning.
Présentation
Pour vous présenter SparkR nous allons nous appuyer tout au long de l’article sur un jeu de données : Titanic. Il contient des informations sur tous les passagers du Titanic (nom, adresse, prix du billet, classe etc…) et notamment si le passager en question a survécu. Ce jeu de données est disponible sur le site Kaggle (kaggle.com), une plateforme regroupant des concours de data science. Il est proposé aux concurrents de prédire si oui on non les individus ont survécu en fonction des autres informations. Vous trouverez en fin d’article un lien pour récupérer le code et les données utilisées dans l’article. Dans la majeure partie de l’article nous allons découvrir l’API disponible dans la version 1.4 de Spark, à savoir les DataFrames. Pour finir nous utiliserons une nouveauté de la version 1.5 : l’intégration d’algorithmes de MLlib dans SparkR pour faire du machine learning. Si vous n’êtes pas familier avec cette librairie nous vous invitons à la découvrir à travers deux articles publiés sur ce blog : 1ère partie – 2ème partie.
Mise en place
Il est tout d’abord nécessaire d’installer R. Nous vous recommandons d’installer RStudio, un IDE très agréable et très utilisé dans la communauté. Vous devez également télécharger Spark (version 1.5.0).
Avant de commencer il est important de faire un petit point sur l’histoire de SparkR pour bien comprendre la suite de l’article. A l’origine SparkR était un projet de recherche à l’AMPLab (le laboratoire qui a créé Spark) dont la page web existe toujours. Ce projet a été développé à l’époque où les DataFrame n’existaient pas encore et se basait donc intégralement sur l’API des RDD. C’est ce qui fait que ce premier projet et le SparkR intégré à Spark n’ont en surface plus grand chose à voir. Le SparkR que nous allons manipuler se base intégralement sur l’API des DataFrame. Et ceci pose un petit problème au premier abord : par défaut il n’est possible de charger que des données étant sous format JSON ou Parquet. Seulement la plupart des données que vous aurez à manipuler sont sous un format « CSV-like » et ne peuvent donc pas être lues directement par SparkR. Il faut pour cela utiliser le package Spark spark-csv. L’intégration des packages Spark dans SparkR est disponible depuis la version 1.4.1 de Spark.
Une fois ceci dit et l’environnement mis en place, nous pouvons commencer !
Les DataFrames
Pour pouvoir utiliser SparkR depuis RStudio, quelques lignes de configurations doivent être exécutées :
[java]# Set this to where Spark is installed
Sys.setenv(SPARK_HOME="/path/to/spark/spark-1.5.0-bin-hadoop2.6")
Sys.setenv(‘SPARKR_SUBMIT_ARGS’=’"–packages" "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"’)
# This line loads SparkR from the installed directory
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
# Load the SparkR library
library(SparkR)[/java]
La première ligne fait pointer la variable d’environnement SPARK_HOME
vers le dossier où Spark est installé. La deuxième ligne permet de charger SparkR (on peut le considérer comme une sorte de « install.packages
» local). Et enfin la troisième ligne permet de charger toutes les fonctions de SparkR de la même façon que tous les autres packages de R.
Comme avec les autres API de Spark il est nécessaire de créer un SparkContext et un SQLContext :
[java]# Initialize SparkContext and SQLContext
sc <- sparkR.init(master="local[*]", appName="SparkR-DataFrame-example")
sqlContext <- sparkRSQL.init(sc)[/java]
Lors de la création du SparkContext
, nous précisons que c’est une application locale utilisant tous les cores disponibles, le nom de l’application et nous incluons le package spark-csv
. Tout est prêt pour lire les données :
[java]# Get the data
titanic <- read.df(sqlContext, path="data_titanic.csv", source="com.databricks.spark.csv", header="true")[/java]
Nous passons à la fonction read.df
le sqlContext
, le chemin vers le fichier, nous précisons que nous voulons utiliser le package spark-csv
pour lire les données et nous indiquons que le header est contenu dans le fichier.
Nous pouvons maintenant afficher les premières données :
[java]> showDF(titanic)
+——+——–+——————–+——+—+—–+—–+——–+——–+———–+——–+—-+—-+——————–+
|pclass|survived| name| sex|age|sibsp|parch| ticket| fare| cabin|embarked|boat|body| homeDest|
+——+——–+——————–+——+—+—–+—–+——–+——–+———–+——–+—-+—-+——————–+
| 1| 0|Allison, Miss. He…|female| 2| 1| 2| 113781|151.5500| C22 C26| S| | |Montreal, PQ / Ch…|
| 1| 0|Allison, Mr. Huds…| male| 30| 1| 2| 113781|151.5500| C22 C26| S| | 135|Montreal, PQ / Ch…|
| 1| 0|Allison, Mrs. Hud…|female| 25| 1| 2| 113781|151.5500| C22 C26| S| | |Montreal, PQ / Ch…|
| 1| 0|Andrews, Mr. Thom…| male| 39| 0| 0| 112050| 0.0000| A36| S| | | Belfast, NI|
| 1| 0|Artagaveytia, Mr….| male| 71| 0| 0|PC 17609| 49.5042| | C| | 22| Montevideo, Uruguay|
| 1| 0|Astor, Col. John …| male| 47| 1| 0|PC 17757|227.5250| C62 C64| C| | 124| New York, NY|
| 1| 0| Baumann, Mr. John D| male| | 0| 0|PC 17318| 25.9250| | S| | | New York, NY|
| 1| 0|Baxter, Mr. Quigg…| male| 24| 0| 1|PC 17558|247.5208| B58 B60| C| | | Montreal, PQ|
| 1| 0|Beattie, Mr. Thomson| male| 36| 0| 0| 13050| 75.2417| C6| C| A| | Winnipeg, MN|
| 1| 0| Birnbaum, Mr. Jakob| male| 25| 0| 0| 13905| 26.0000| | C| | 148| San Francisco, CA|
| 1| 0|Blackwell, Mr. St…| male| 45| 0| 0| 113784| 35.5000| T| S| | | Trenton, NJ|
| 1| 0|Borebank, Mr. Joh…| male| 42| 0| 0| 110489| 26.5500| D22| S| | |London / Winnipeg…|
| 1| 0|Brady, Mr. John B…| male| 41| 0| 0| 113054| 30.5000| A21| S| | | Pomeroy, WA|
| 1| 0| Brandeis, Mr. Emil| male| 48| 0| 0|PC 17591| 50.4958| B10| C| | 208| Omaha, NE|
| 1| 0|Brewe, Dr. Arthur…| male| | 0| 0| 112379| 39.6000| | C| | | Philadelphia, PA|
| 1| 0|Butt, Major. Arch…| male| 45| 0| 0| 113050| 26.5500| B38| S| | | Washington, DC|
| 1| 0|Cairns, Mr. Alexa…| male| | 0| 0| 113798| 31.0000| | S| | | |
| 1| 0|Carlsson, Mr. Fra…| male| 33| 0| 0| 695| 5.0000|B51 B53 B55| S| | | New York, NY|
| 1| 0|Carrau, Mr. Franc…| male| 28| 0| 0| 113059| 47.1000| | S| | | Montevideo, Uruguay|
| 1| 0|Carrau, Mr. Jose …| male| 17| 0| 0| 113059| 47.1000| | S| | | Montevideo, Uruguay|
+——+——–+——————–+——+—+—–+—–+——–+——–+———–+——–+—-+—-+——————–+[/java]
Plus d’informations sur les données sur le site de Kaggle.
Et nous pouvons également afficher le schéma de la DataFrame :
[java]> printSchema(titanic)
root
|– pclass: string (nullable = true)
|– survived: string (nullable = true)
|– name: string (nullable = true)
|– sex: string (nullable = true)
|– age: string (nullable = true)
|– sibsp: string (nullable = true)
|– parch: string (nullable = true)
|– ticket: string (nullable = true)
|– fare: string (nullable = true)
|– cabin: string (nullable = true)
|– embarked: string (nullable = true)
|– boat: string (nullable = true)
|– body: string (nullable = true)
|– homeDest: string (nullable = true)[/java]
Comme le fichier d’entrée est un CSV, tous les champs sont sous format String. Nous pouvons changer les types de cette façon :
[java]> # Change types
> titanic$age <- cast(titanic$age, "double")
> titanic$fare <- cast(titanic$fare, "double")
> titanic$sibsp <- cast(titanic$sibsp, "double")
> titanic$parch <- cast(titanic$parch, "double")
> titanic$body <- cast(titanic$body, "double")
> titanic$pclass <- cast(titanic$pclass, "long")
> titanic$survived <- cast(titanic$survived, "long")[/java]
En réaffichant le schéma on voit que les types ont bien été changés :
[java]> printSchema(titanic)
root
|– pclass: long (nullable = true)
|– survived: long (nullable = true)
|– name: string (nullable = true)
|– sex: string (nullable = true)
|– age: double (nullable = true)
|– sibsp: double (nullable = true)
|– parch: double (nullable = true)
|– ticket: string (nullable = true)
|– fare: double (nullable = true)
|– cabin: string (nullable = true)
|– embarked: string (nullable = true)
|– boat: string (nullable = true)
|– body: double (nullable = true)
|– homeDest: string (nullable = true)[/java]
Un certain nombre de fonctions sont disponibles pour manipuler les DataFrame. Comme la fonction select par exemple :
[java]> # Select one column
> name <- select(titanic,"name")
> showDF(name, 5)
+——————–+
| name|
+——————–+
|Allison, Miss. He…|
|Allison, Mr. Huds…|
|Allison, Mrs. Hud…|
|Andrews, Mr. Thom…|
|Artagaveytia, Mr….|
+——————–+[/java]
Vous pouvez également filtrer des lignes. Par exemple nous voulons savoir qui a payé son billet plus de 200$.
[java]> # Filter and select several columns
> rich <- filter(titanic, titanic$fare > 200)
> showDF(select(rich, c(rich$fare, rich$name)), 5)
+——–+——————–+
| fare| name|
+——–+——————–+
| 227.525|Astor, Col. John …|
|247.5208|Baxter, Mr. Quigg…|
|221.7792| Farthing, Mr. John|
| 263.0|Fortune, Mr. Char…|
| 263.0| Fortune, Mr. Mark|
+——–+——————–+[/java]
La première ligne permet de filtrer les lignes. Nous passons en paramètre de la fonction filter la DataFrame et la condition à valider. La deuxième ligne affiche une DataFrame où nous n’avons gardé que le nom et le prix du billet. Pour sélectionner plusieurs colonnes il suffit de les lister au sein d’un vecteur (que l’on définit avec la lettre clé c). Nous pouvons connaitre le nombre de personne ayant payé plus de 200$ grâce à la fonction count :
[java]> # Number of people who paid their ticket more than 200$
> count(rich)
[1] 38[/java]
Il est également possible de réaliser des opérations un peu plus complexe comme un groupBy. Nous voulons par exemple classer les individus par age :
[java]# GroupBy
> groupByAge <- groupBy(titanic, titanic$age)
> age <- summarize(groupByAge, count = count(titanic$age))
> showDF(age, 5)
+—-+—–+
| age|count|
+—-+—–+
| 7.0| 4|
|60.0| 7|
|35.0| 23|
|21.0| 41|
|24.0| 47|
+—-+—–+[/java]
Nous passons simplement à la fonction groupBy la DataFrame et la variable age. Pour compter le nombre d’individus nous utilisons la fonction summarize. Il peut être alors intéressant de tracer un histogramme des valeurs calculées. Pour cela nous allons utiliser un histogramme pondéré grâce au package weights :
[java]library(weights)
ageCollect <- collect(age)
wtd.hist(ageCollect$age, weight = ageCollect$count, breaks = 16, col = "lightblue", xlab = "Age", main = "Répartition des individus en fonction de l’âge")[/java]
Voila comment nous avons pu simplement réaliser une visualisation à partir de données Spark. Attention cependant à l’utilisation de collect, les données doivent impérativement pouvoir rentrer en mémoire.
Chaîner les opérations
Revenons sur le groupBy. Pour calculer et afficher ce que nous voulions nous avons exécuté 3 lignes de codes :
[java]groupByAge <- groupBy(titanic, titanic$age)
age <- summarize(groupByAge, count = count(titanic$age))
showDF(age, 5)[/java]
Nous aurions pu tout écrire en une seule ligne et nous épargner de devoir nommer deux variables que nous ne réutilisons pas par la suite :
[java]showDF(summarize(groupBy(titanic, titanic$age), count = count(titanic$age)), 5)[/java]
On ne peut pas vraiment dire que cela soit très lisible. Nous allons pour cela utiliser le package magrittr, qui permet de chaîner des opérations sans avoir à les imbriquer :
[java]# Pipeline
library(magrittr)
groupBy(titanic, titanic$age) %>% summarize(., count = count(titanic$age)) %>% showDF(., 5)[/java]
Le code est désormais tout à fait lisible et tient en une ligne.
Pour aller plus loin nous vous recommandons le très bon talk de Chris Freeman, un des principaux contributeurs à SparkR, lors du Spark Summit 2015 à San Francisco :
Utiliser MLlib
Depuis le 9 septembre et la sortie de la version 1.5.0 il est donc possible d’utiliser des algorithmes de MLlib avec SparkR. Pour l’instant seul l’algorithme GLM est disponible, vous ne pouvez donc réaliser que des régressions linéaires (prédire une variable quantitative) et logistiques (prédire une variable qualitative). Nous allons dans notre cas utiliser une régression logistique pour prédire si un individu a survécu ou non.
Comme nous vous l’expliquions dans les articles sur MLlib, lors de la construction d’un modèle de machine learning il est nécessaire d’utiliser deux jeux de données, l’un pour construire le modèle et l’autre pour mesurer son efficacité. Cela permet d’éviter un phénomène de surapprentissage. Dans les autres API de Spark il existe une fonction randomSplit qui permet de séparer un dataset en plusieurs sous datasets. A l’heure où nous publions cet article cette fonction n’est pas implémentée dans SparkR. Un ticket JIRA a été ouvert pour son implémentation. Pour notre exemple nous avons séparé à la main les données. Le jeu de données d’apprentissage représente 75% des données d’origine et le jeu de test 25%. On réitère le même processus que précédemment pour la préparation des données en gardant uniquement les variables que nous allons utiliser dans la régression :
[java]# MLlib
# Read train and test set
train <- read.df(sqlContext, path="data_titanic_train.csv", source="com.databricks.spark.csv", header="true")
test <- read.df(sqlContext, path="data_titanic_test.csv", source="com.databricks.spark.csv", header="true")
# Get the important variables
dataForGlmTrain <- select(train, "fare", "age", "survived", "pclass", "sex")
dataForGlmTest <- select(test, "fare", "age", "survived", "pclass", "sex")
# Change types for train set
dataForGlmTrain$age <- cast(dataForGlmTrain$age, "double")
dataForGlmTrain$fare <- cast(dataForGlmTrain$fare, "double")
dataForGlmTrain$pclass <- cast(dataForGlmTrain$pclass, "long")
dataForGlmTrain$survived <- cast(dataForGlmTrain$survived, "long")
# Change types for test set
dataForGlmTest$age <- cast(dataForGlmTest$age, "double")
dataForGlmTest$fare <- cast(dataForGlmTest$fare, "double")
dataForGlmTest$pclass <- cast(dataForGlmTest$pclass, "long")
dataForGlmTest$survived <- cast(dataForGlmTest$survived, "long")[/java]
Le dataset Titanic comporte beaucoup de données manquantes et les algorithmes de MLlib ne peuvent fonctionner que si les données sont complètes. Pour cela nous devons remplir les valeurs manquantes. Dans notre cas il manque des données dans les variables fare et age. Une approche naïve et très simple à mettre en place consiste à remplacer les données manquantes par la moyenne des autres valeurs. C’est que nous faisons ici :
[java]# Fill the null values by the average of the other values
dataWithoutNullTrain <- dataForGlmTrain %>% fillna(., 28, "age") %>% fillna(. , 14.45, "fare")
dataWithoutNullTest <- dataForGlmTest %>% fillna(., 28, "age") %>% fillna(. , 14.45, "fare")[/java]
Les données sont désormais prêtes pour la construction du modèle.
[java]# Building the model
model <- SparkR::glm(survived ~ sex + age + fare + pclass, family = "binomial", data = dataWithoutNullTrain)[/java]
La syntaxe est la même que pour la fonction glm de R. Le premier argument est une formule où nous précisons que nous voulons expliquer la variable survived en fonction des variables sex, age, fare et pclass. Nous précisons ensuite la famille de GLM que nous voulons : binomial pour une régression logistique et gaussian pour une régression linéaire. Et pour finir nous précisons les données sur lesquelles nous voulons travailler.
Le modèle est désormais construit et nous pouvons passer à l’étape de validation :
[java]# Make the prediction
predictionDF <- predict(model, newData = dataWithoutNullTest)[/java]
predictionDF est un DataFrame contenant les résultats de la prédiction dans une variable prediction (1 si l’algorithme a prédit que l’individu a survécu, 0 sinon). Dans cette DataFrame, les vrais résultats de la variable survived se trouvent désormais dans une variable nommée label. Pour déterminer la précision de notre prédiction nous créons la variable diff égale au carré de la différence entre prediction et label. De cette façon diff vaudra 0 si l’algorithme a correctement prédit et 1 sinon :
[java]# Create the variable diff : 0 if bad prediction, 1 if good prediction
predictionDF$diff <- (predictionDF$label – predictionDF$prediction)^2[/java]
En sommant diff nous pouvons donc connaitre le nombre de mauvaises prédictions. À partir de là nous pouvons donc calculer la proportion de bonnes prédictions :
[java]> # Compute the percentage of good prediction
> precision <- 1 – sum(collect(select(predictionDF, "diff")))/count(dataWithoutNullTest)
> precision
[1] 0.7969231[/java]
Nous obtenons donc quasiment 80% de bonnes prédictions, un plutôt bon résultat pour un algorithme comme la régression logistique.
L’intégralité du code de cet article ainsi que les données utilisées sont disponibles sur GitHub.
Pour aller plus loin avec SparkR et la data science en général sur Spark nous vous recommandons la formation Analyse de données et Machine Learning avec Spark, donnée par nos Data Scientists.
SparkR est donc un projet jeune mais prometteur qui deviendra de plus en plus intéressant dans les versions à venir. Un grand pas a déjà été fait avec l’intégration d’algorithmes de MLlib dans la version 1.5. Nul doute que les autres algorithmes de MLlib vont rapidement être intégrés. On aura alors entre les mains un projet permettant de faire facilement le lien entre l’outil préféré des statisticiens et un framework qui est en train de devenir une référence pour le calcul distribué. Affaire à suivre…
Commentaire
2 réponses pour " Découvrez SparkR, la nouvelle API de Spark "
Published by Découvrez SparkR, la nouvelle API de Spark | Blog Xebia France http://blog.engineering.publicissapient.fr/2015/09/30/decouvrez-sparkr-la-nouvelle-api-de-spark/ #bigdata #spark , Il y a 7 ans
[…] SparkR, la nouvelle API de Spark | Blog Xebia France http://blog.engineering.publicissapient.fr/2015/09/30/decouvrez-sparkr-la-nouvelle-api-de-spark/ #bigdata […]
Published by Amine , Il y a 7 ans
Merci pour cet article Phelip, franchement c’est super intéressant :)
Published by ait amrane , Il y a 4 ans
Bonjour à l’installation j’ai une erreur du type :
Error in sparkR.sparkContext(master, appName, sparkHome, convertNamedListToEnv(sparkEnvir), :
JVM is not ready after 10 seconds
est ce que vous pouvez m’aider ?
Merci :)