Il y a 5 ans -
Temps de lecture 21 minutes
Spark : comprendre et corriger l’exception Task not serializable
Dans tous les langages, le debugging peut parfois s’avérer une tâche fastidieuse. C’est d’autant plus le cas lorsque l’on utilise un framework distribué avec beaucoup de concepts complexes sous-jacents, comme Spark. Cet article propose de revenir sur l’une des erreurs les plus souvent rencontrées lors du développement d’applications avec Spark, et qui n’est pas toujours comprise : la redoutable – et surtout redoutée – Task not serializable.
Quand se produit cette erreur ?
Cette erreur apparaît le plus souvent lors du découpage du code en plusieurs classes « métier » pour respecter les recommandations du software craftsmanship. Malheureusement, cette exception, si elle est mal comprise, peut vite devenir un frein au refactoring et au respect des bonnes pratiques de développement. Pire, elle peut se solder par rassembler tout le code dans une même classe, voir une même fonction, ou d’abuser de @transient
, sans vraiment analyser le fond du problème.
Pour aider les développeurs à mieux comprendre cette exception, depuis la version 1.3.0 de Spark, la serialization stack a été améliorée, pour trouver plus facilement la cause du problème. Lorsqu’une exception NotSerializableException est rencontrée, le debugger va désormais retrouver le chemin complet vers l’objet ne pouvant pas être sérialisé.
Cet article détaille le mécanisme interne de sérialisation de Spark, ainsi que les différentes solutions à cette exception, à travers le développement d’une application simple. Cela permettra aussi d’aborder de manière plus générale les bonnes pratiques à appliquer lors du développement avec Spark.
À la recherche de la Task not serializable
Le fil rouge de cet article est donc le développement d’une application développée en Scala 2.10 avec Spark 1.6.0. Cette application transforme le contenu d’un fichier et applique des transformations sur chacune des lignes.
Programme de base
Le programme de base est élémentaire. Il se contente de mettre chaque ligne d’un fichier en entrée en majuscules, et de sauvegarder le résultat sur disque.
[scala]object Main {
def main(args: Array[String]) = {
val conf = new SparkConf()
val sc = new SparkContext(conf)
sc.textFile(args.head).map(_.toUpperCase).saveAsTextFile(args(1))
sc.stop()
}
}[/scala]
Cette application, une fois packagée, peut être lancée via une commande similaire à la suivante :
[bash]spark-submit –name myapp –master yarn –deploy-mode client –class Main my-application.jar input.txt output[/bash]
Implémentation d’une nouvelle transformation
Une nouvelle transformation plus complexe va être ajoutée : en plus de mettre en majuscules chaque ligne, son contenu sera inversé et les voyelles remplacées par « ? ». L’idée ici est d’illustrer une transformation « non triviale ». L’implémentation d’une nouvelle transformation est aussi l’occasion de refactorer notre programme.
[scala]case class SparkProgram(inputPath: String, outputPath: String) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
def transformUpperCase() = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
// our new transformation
def transformUpperCaseReverseAndHideVowels() = sc.textFile(inputPath).map(line => {
line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}).saveAsTextFile(outputPath)
}
object Main {
def main(args: Array[String]) = {
val program = SparkProgram(args.head, args(1))
program.transformUpperCaseReverseAndHideVowels()
}
}[/scala]
En plus de la création de la nouvelle transformation, le code relatif à Spark a été séparé du Main
. Il s’agit d’une bonne pratique à adopter pour la clarté du code. Le fait de séparer la partie qui utilise Spark est aussi recommandé pour séparer les responsabilités.
Jusqu’ici, le programme ci-dessus fonctionne. Mais la nouvelle transformation étant « plus complexe », notre âme de craftsman va nous pousser à extraire le cœur de la méthode de transformUpperCaseReverseAndHideVowels
dans une fonction à part, pour pouvoir par exemple faire des tests unitaires sur celle-ci, sans forcément avoir besoin de lire un fichier en entrée (respect encore ici du principe de séparation des responsabilités). Une modification de la classe SparkProgram
est donc nécessaire pour isoler la transformation « complexe » :
[scala]case class SparkProgram(inputPath: String, outputPath: String) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
def transformUpperCase() = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
def transformUpperCaseReverseAndHideVowels() = sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
// our complex method extracted
def upperCaseReverseAndHideVowels(line: String) = line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}[/scala]
Mais patatras ! Le lancement de notre programme produit alors l’erreur tant redoutée :
[java]Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at fr.xebia.spark.serialization.SparkProgram.transformUpperCaseReverseAndHideVowels(Main.scala:17)
at fr.xebia.spark.serialization.Main$.main(Main.scala:25)
at fr.xebia.spark.serialization.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
– object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@1c9e07c6)
– field (class: fr.xebia.spark.serialization.SparkProgram, name: conf, type: class org.apache.spark.SparkConf)
– object (class fr.xebia.spark.serialization.SparkProgram, SparkProgram(Arguments(input.txt,output)))
– field (class: fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation2$1, name: $outer, type: class fr.xebia.spark.serialization.SparkProgram)
– object (class fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation2$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
… 21 more[/java]
Comment le simple fait d’isoler le contenu d’une méthode a-t-il pu faire échouer notre job ? C’est ce que nous allons essayer de comprendre.
La stack trace est claire, c’est le SparkConf
qui est coupable ici :
[java]Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
– object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@1c9e07c6)
– field (class: fr.xebia.spark.serialization.SparkProgram, name: conf, type: class org.apache.spark.SparkConf)
[/java]
L’erreur indique que l’instance de type SparkConf
ne peut pas être sérialisée.
Pourquoi Spark essaye-t-il de sérialiser l’instance de SparkConf
?
upperCaseReverseAndHideVowels
. Pour que cet appel soit possible, Spark va devoir sérialiser cette méthode. En effet, le fonctionnement distribué de Spark nécessite de distribuer l’exécution du code sur différents executors. Petit rappel sur le mécanisme interne de Spark :
Lorsque l’on appelle la méthode transformUpperCaseReverseAndHideVowels
, celle-ci va faire appel à
- Spark va tout d’abord décomposer l’ensemble des opérations faites sur les RDD en tâches
- avant d’exécuter ces tâches, Spark va calculer leur closure, c’est-à-dire l’ensemble des variables et des méthodes devant être visibles depuis l’executor pour pouvoir appliquer les transformations sur les RDD
- la closure va ensuite être sérialisée, puis envoyée vers chacun des executors
Dans notre cas, la méthode upperCaseReverseAndHideVowels
appartient à la closure et va donc devoir être sérialisée pour pouvoir être appelée sur chaque executor. Or, il est impossible de sérialiser une méthode seule : toute la classe SparkProgram
va devoir être sérialisée. Ses attributs conf
et sc
seront par conséquent aussi sérialisés, d’où l’erreur.
Mais pourquoi notre code fonctionnait avant ?
Avant la dernière modification, la transformation était faite dans une fonction anonyme : { line => line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?") }
. Dans ce cas, la classe SparkProgram
, et par extension ses attributs, n’avaient donc pas besoin d’être sérialisés, puisque l’ensemble du code de cette fonction anonyme était executée sur chaque executor, sans dépendance avec l’extérieur de cette fonction.
Je sais, il suffit de déclarer SparkProgram
sérialisable en ajoutant extends Serializable
!
C’est commun de penser à cette solution, mais c’est une idée fausse. De toute façon ici, SparkProgram
est une case class
, qui par défaut est sérialisable. Le problème vient du fait que l’un des attributs de SparkProgram
, SparkConf
en l’occurence, n’est pas sérialisable.
Les solutions
Pour corriger cette erreur, plusieurs solutions s’offrent à nous. Il existe deux catégories de solutions :
- les rustines
- les solutions craft
Les rustines
Ce ne sont pas de bonnes solutions d’un point de vue craft, mais elles seront tout de même présentées ici afin de détailler pourquoi leur utilisation peut être évitée.
CTRL+Z CTRL+Z CTRL+Z
Ce n’est pas une option ici :)
Séparer le contenu des méthodes est en effet une bonne solution de manière générale. Notre solution qui fonctionnait précédemment n’est pas acceptable d’un point de vue craft, surtout si l’on imagine avoir une multitude de transformations à implémenter.
Function literal
Pour éviter que SparkProgram
et ses attributs ne soient sérialisés, un des moyens est que l’appel de la méthode de la transformation ne nécessite pas la sérialisation complète de la classe. Pour cela, il est possible de déclarer en tant que val
la fonction upperCaseReverseAndHideVowels
: c’est ce qu’on appelle une function literal :
[scala]def transformUpperCaseReverseAndHideVowels() = {
sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
}
val upperCaseReverseAndHideVowels = (line: String) => line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")[/scala]
Pourquoi une fonction déclarée en tant que val
au lieu de def
fonctionne ?
upperCaseReverseAndHideVowels
n’est plus une fonction à proprement parler, mais plutôt une variable de type Function1
. Si on regarde le contenu de la case class à l’aide de javap
(le décompileur de classe Java) on obtient la confirmation :
[java]public scala.Function1<java.lang.String, java.lang.String> upperReverseAndReplaceVowels()[/java]
Or Function1
est bien sérialisable, comme le montre le test suivant facilement reproductible en console :
[scala]scala> val myFunction = (i:Int) => i
myFunction: Int => Int = <function1>
scala> myFunction.isInstanceOf[Serializable]
res0: Boolean = true[/scala]
Pourquoi ne pas déclarer la function literal à l’intérieur de transformUpperCaseReverseAndHideVowels
?
C’est tout à fait possible : de cette manière, l’appel de la function literal ne nécessitera pas la sérialisation. Cependant, cela n’a pas d’intérêt car on ne pourra pas tester unitairement la méthode upperCaseReverseAndHideVowels
.
Si la déclaration de la fonction upperCaseReverseAndHideVowels
à l’intérieur de transformUpperCaseReverseAndHideVowels
ne provoque pas l’erreur car ne nécessite pas la sérialisation de la classe, alors pourquoi ne pas simplement déclarer upperCaseReverseAndHideVowels
comme une vraie fonction (avec def
) ?
La déclaration d’une fonction avec def
, que ce soit à l’intérieur ou à l’extérieur d’une autre fonction, déclare toujours une « vraie » fonction. En la déclarant à l’intérieur, on observe à l’aide de javap
que la fonction public java.lang.String upperReverseAndHideVowels(java.lang.String);
est toujours présente. L’appel de transformUpperCaseReverseAndHideVowels
nécessitera donc toujours la sérialisation de la classe complète, bien que la fonction soit définie à l’intérieur d’elle.
Dans tous les cas, la déclaration d’une function literal ressemble plus à un contournement du problème. Il vaut donc mieux l’éviter.
Utilisation de @transient
Pour que conf
et sc
ne soient pas sérialisés, il est possible d’utiliser le mot-clé @transient
. Toutes les variables déclarées de cette manière ne seront donc pas sérialisées.
Cette solution pourrait convenir; cependant, il est préférable d’éviter l’utilisation de @transient
. Son utilisation n’est en soi pas mauvaise, mais résoudre toutes les exceptions Task Not Serializable par @transient
peut révéler dans certains cas un code smell, comme par exemple une mauvaise organisation du code. C’est pour cette raison qu’il faut veiller à ne pas abuser de cette annotation, celle-ci pouvant de plus provoquer des erreurs difficiles à débugger en mode distribué (il ne faut pas oublier qu’une variable @transient
prend une valeur par défaut lorsqu’elle est initialisée sur un executor). Un exemple de mauvaise utilisation de @transient
sera présenté plus tard dans l’article.
Utiliser un singleton pour le SparkContext
Il est possible de séparer l’instanciation du SparkContext
de SparkProgram
sans pour autant passer le SparkContext
de classe en classe. Pour cela, un singleton chargé d’instancier le SparkContext
peut être créé. Le SparkContext
instancié sera alors utilisé par toutes les méthodes ayant besoin d’accéder à un SparkContext.
[scala]// singleton for SparkContext
object Spark {
val conf = new SparkConf()
val sc = new SparkContext(conf)
}
case class SparkProgram(inputPath: String, outputPath: String) {
def transformUpperCase() = Spark.sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
def transformUpperCaseReverseAndHideVowels() = Spark.sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
def upperCaseReverseAndHideVowels(line: String): String = {
line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}
}
object Main {
def main(args: Array[String]) = {
val program = SparkProgram(args.head, args(1))
program.transformUpperCaseReverseAndHideVowels()
}
}[/scala]
L’avantage ici est qu’une seule instance du SparkContext
sera utilisée par l’ensemble du programme. Le SparkContext
sera instancié la première fois qu’une méthode appellera Spark.sc
.
Cependant, l’inconvénient majeur, si ce n’est rédhibitoire, est que le code utilisant ce SparkContext
ne pourra pas être testé facilement. En effet, comment remplacer le Spark.sc
par un SparkContext
de notre choix dans les tests ?
Les solutions craft
Elles sont opposées aux solutions dites « rustines » : ce sont en effet les solutions à recommander en priorité pour garder un code de qualité.
Séparer la méthode dans un object
Pour éviter la sérialisation du SparkContext
, la méthode upperCaseReverseAndHideVowels
peut être extraite dans un object
à part (LineProcessor
), qui se charge simplement d’opérer des transformations sur une ligne. Lorsque cette fonction sera appelée, c’est l’object
LineProcessor
qui sera sérialisé, et non le SparkProgram
(dans lequel on trouve conf
et sc
).
[scala]object LineProcessor {
def upperCaseReverseAndHideVowels(line: String): String =
line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}
case class SparkProgram(inputPath: String, outputPath: String) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
def transformUpperCase() = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
def transformUpperCaseReverseAndHideVowels() = sc.textFile(inputPath).map(LineProcessor.upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
}[/scala]
Ici, il n’y a aucun souci pour sérialiser LineProcessor
puisqu’il s’agit d’un object
: par défaut, tous les attributs d’un object
sont static
, et donc implicitement transient. Cependant, notons que dans ce cas, il aurait aussi été tout à fait possible de définir LineProcessor
comme une class
(ou case class
), puisque LineProcessor
n’a aucun attribut (donc aucun attribut qui n’est pas sérialisable !). Un exemple avec un attribut non sérialisable est présenté plus tard dans l’article.
Cette solution fonctionne mais n’est pas encore idéale, car nous avons ici une classe SparkProgram
qui définit nos transformations et qui est en charge d’instancier le SparkContext
. L’objectif est de séparer ces deux rôles, afin de respecter le principe de séparation des responsabilités (encore lui !).
Déléguer l’instanciation du SparkContext
Si l’instanciation des attributs qui posent problème (SparkConf
et SparkContext
) se fait à l’extérieur de SparkProgram
, ceux-ci ne seront donc plus sérialisés. Cependant, notre SparkProgram
aura toujours besoin d’un SparkContext
! Pour l’utiliser de manière simple et transparente, on peut alors utiliser les paramètres implicites :
[scala]case class SparkProgram(inputPath: String, outputPath: String) {
def transformUpperCase()(implicit sc: SparkContext) = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
def transformUpperCaseReverseAndHideVowels()(implicit sc: SparkContext) = sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
def upperCaseReverseAndHideVowels(line: String): String = {
line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}
}
object Main {
def main(args: Array[String]) = {
val conf = new SparkConf()
implicit val sc = new SparkContext(conf)
val program = SparkProgram(args.head, args(1))
program.transformUpperCaseReverseAndHideVowels()
sc.stop()
}
}[/scala]
L’instanciation du SparkContext
va être ici déléguée au Main
, et son utilisation va se faire de manière implicite dans SparkProgram
. Déclarer le SparkContext
dans le Main
n’est pas choquant ici, puisque tout notre programme se base sur Spark.
Pourquoi ne pas déclarer le paramètre implicit au niveau de la classe SparkProgram
(au même titre que inputPath
et outputPath
) ?
C’est possible, mais cela implique qu’un SparkContext
implicite doive exister dans le scope à l’instanciation d’un SparkProgram
, et ce même pour utiliser des fonctions n’utilisant pas le SparkContext
(upperCaseReverseAndHideVowels
par exemple). Pour tester unitairement cette fonction, il faudra donc obligatoirement déclarer un SparkContext
, qui ne sera finalement pas utilisé.
Y a-t-il une raison d’utiliser des implicits plutôt qu’un paramètre en dur spécifié à l’appel de la fonction ?
Non, ici les implicits servent juste à simplifier la lecture du code. C’est une pratique courante lorsqu’on utilise Spark de déclarer le SparkContext
en implicit
pour éviter de le passer dans toutes les fonctions. Attention cependant, dans le cas général, il faut utiliser les implicits avec parcimonie pour éviter de rendre difficile la compréhension du code.
Une solution possible
D’une manière générale, il vaut mieux minimiser l’adhérence avec Spark afin de pouvoir tester unitairement les fonctions ne nécessitant pas de SparkContext
. De plus, il convient d’être cohérent dans tout le code pour garder un code lisible (si on choisit d’utiliser @transient
– ce qui n’est pas recommandé cependant – alors il vaut mieux l’utiliser partout).
Une solution convenable pour notre problème, qui est un mix des deux solutions craft, consiste alors à :
- déléguer la création du
SparkContext
- séparer les méthodes de transformation dans un objet à part
[scala]object LineProcessor {
def upperCaseReverseAndHideVowels(line: String): String = {
line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}
}
case class SparkProgram(inputPath: String, outputPath: String)
(implicit sc: SparkContext) {
def withInputAndOutput(f: String => String) = {
sc.textFile(inputPath).map(f).saveAsTextFile(outputPath)
}
def transformUpperCase() = withInputAndOutput(_.toUpperCase)
def transformUpperCaseReverseAndHideVowels() = withInputAndOutput(LineProcessor.upperCaseReverseAndHideVowels)
}
object Main {
def main(args: Array[String]) = {
val conf = new SparkConf()
implicit val sc = new SparkContext(conf)
val program = SparkProgram(args.head, args(1))
program.transformUpperCaseReverseAndHideVowels()
sc.stop()
}
}[/scala]
Pour simplifier encore plus le corps des méthodes de transformation, on utilise ici le loaner pattern (design pattern bien connu des utilisateurs de Scala), puisque la lecture/écriture sera utilisée par toutes les transformations. De plus, puisque toutes les méthodes de SparkProgram
utilisent ici un SparkContext
, le paramètre implicite a pu être déclaré directement au niveau de la case class
.
Un autre cas de Task not serializable n’ayant pas pour origine le SparkContext
Nous allons maintenant voir une autre occurrence de Task Not Serializable, mais cette fois-ci n’ayant pas pour cause un composant de Spark.
Pour illustrer, une nouvelle transformation est implémentée, qui consiste à saler chaque ligne avec un préfixe aléatoire. Le but ici est d’introduire une classe avec un attribut d’un type non sérialisable. Ici, c’est Random
qui va être utilisé (jusqu’en Scala 2.10, la classe Random
n’était pas sérialisable).
Cet exemple avec Random
est seulement pris pour illustrer notre propos; une autre classe non sérialisable aurait pu être choisie, par exemple une classe que nous aurions nous-même définie.
[scala]
case class LineSalter(seed: Int) {
val random = new scala.util.Random(seed)
def salt(line: String) = random.nextString(10) + " " + line
}
case class SparkProgram(inputPath: String, outputPath: String)
(implicit sc: SparkContext) {
def withInputAndOutput(f: String => String) = {
sc.textFile(inputPath).map(f).saveAsTextFile(outputPath)
}
def transformUpperCase() = withInputAndOutput(_.toUpperCase)
def transformUpperCaseReverseAndHideVowels() = withInputAndOutput(LineProcessor.upperCaseReverseAndHideVowels)
def transformSaltLines() = {
val salter = LineSalter(1234)
withInputAndOutput(salter.salt)
}
}
object Main {
def main(args: Array[String]) = {
val conf = new SparkConf()
implicit val sc = new SparkContext(conf)
val program = SparkProgram(args.head, args(1))
program.transformSaltLines()
sc.stop()
}
}[/scala]
Au lancement du programme, on obtient :
[java]Serialization stack:
– object not serializable (class: scala.util.Random, value: scala.util.Random@468dda3e)
– field (class: fr.xebia.spark.serialization.LineSalter, name: random, type: class scala.util.Random)
– object (class fr.xebia.spark.serialization.LineSalter, LineSalter(1234))
– field (class: fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation3$1, name: salter$1, type: class fr.xebia.spark.serialization.LineSalter)
– object (class fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation3$1, <function1>)[/java]
Comme prévu, nous avons une erreur Task not serializable, causée par la sérialisation de l’instance de type scala.util.Random
.
C’est le moment d’utiliser @transient
pour la variable random
!
Déclarer random
@transient
corrige bien l’exception Task Not Serializable. En effet, grâce à @transient
, random
n’est pas sérialisé. Mais en contrepartie, lors de son instanciation sur un des executors, random
va prendre sa valeur par défaut, qui est null
. Donc notre exception Task Not Serializable va se transformer en NullPointerException
lorsqu’une méthode va être appelée (nextString
ici) !
Les solutions
Pour faire fonctionner notre code, plusieurs solutions s’offrent à nous.
Utiliser une fonction anonyme
Il est possible de déclarer l’instanciation du LineSalter
dans une fonction anonyme.
[scala]def transformSaltLines() = withInputAndOutput { line =>
LineSalter(1234).salt(line)
}[/scala]
Mais le sel généré sera toujours le même (même seed utilisé). De plus, dans le cas général, l’inconvénient de cette solution est qu’une instance de LineSalter
sera créée pour chaque ligne, ce qui n’est pas performant (on peut imaginer un cas plus complexe où l’instanciation d’une classe serait beaucoup plus lourde).
Privilégier les object
Dans ce cas précis (avec Random
), il serait possible d’appeler Random.nextString()
, qui est une méthode du companion object Random
. Plus aucun problème de sérialisation n’apparaîtrait (grâce au fait que Random
soit déclaré comme un object
), mais il sera en contrepartie impossible de modifier la seed pour la génération de nos sels. De plus, cette solution fonctionne dans ce cas précis, mais n’est pas applicable dans le cas où le problème vient d’une classe différente de Random
.
Dans le cas général, l’utilisation d’un object
peut être préférée. Il est possible de définir un wrapper object
autour d’une classe posant problème :
[scala]class MyClassNotSerializable(…) {
val notSerializableAttribute = …
def f() = …
}
object WrapperMyClassNotSerializable {
val notSerializableInstance = new MyClassNotSerializable(…)
def f() = notSerializableInstance.f()
}[/scala]
Cette solution n’est cependant pas très flexible si on souhaite disposer de n instances de MyClassNotSerializable
, avec des paramètres différents par exemple. L’utilisation de n wrappers autour de chaque instance n’est pas optimal pour la compréhension du code.
Utiliser une lazy val
Déclarer random
en lazy val
permet de sérialiser la classe LineSalter
sans erreur :
[scala]case class LineSalter(seed: Int) {
lazy val random = new scala.util.Random(seed)
def salt(line: String) = random.nextString(10) + " " + line
}[/scala]
La variable random
ne sera alors instanciée qu’une fois, au premier appel de la variable, donc initialisée sur chaque executor. L’instance du LineSalter
est initialisée sur le driver, mais pas la valeur de random.
Dans ce cas particulier, cette solution paraît la plus adaptée car seule elle permet d’instancier random
avec une seed définie. Attention cependant à ne pas abuser de cette solution (si il est possible de faire autrement), pour ne pas réduire la lisibilité du code.
Conclusion
L’exception Task Not Serializable est par nature liée au fonctionnement de Spark et de son caractère distribué. La correction d’un bug provoqué par cette exception n’est pas toujours aisé lorsqu’il apparaît, mais cette exception peut être évitée au maximum tout au long des étapes de développement en isolant le plus possible les portions de code nécessitant Spark. En diminuant l’adhérence à Spark, on diminue le risque de rencontrer cette erreur, en plus de rendre le code plus lisible et de faciliter les tests unitaires.
Cependant, même en respectant ces recommandations, cette exception peut apparaître, à cause de classes externes à Spark et non sérialisables. Dans le cas des classes définies par l’utilisateur, il est assez simple de les rendre sérialisables; dans le cas de classes appartenant à des librairies non modifiables (comme scala.util.Random
), l’utilisation des wrappers object
est conseillée, lorsque cela est possible.
Une solution qui fonctionne dans tous les cas n’existe pas, mais une refonte du code peut parfois s’avérer nécessaire pour contourner ce problème : par exemple, transformer des variables de classes non sérialisables en paramètres de fonction (lorsque les variables ne sont pas utilisées par toutes les fonctions), ou bien créer de petits object
, au lieu de class
avec des attributs non sérialisables.
Autres références sur le sujet
Using non serializable objects in Apache Spark
Demystifying Spark serialization errors
Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
Commentaire
1 réponses pour " Spark : comprendre et corriger l’exception Task not serializable "
Published by Boutiti , Il y a 5 ans
Je te remercie pour la qualité de ton blog .
Moi j’ai eu pas mal de fois ce problème , j’ai résolu ce problème en déclarant que des méthodes static pour les fonctions traitements spark.
Published by Axel , Il y a 5 ans
Très bon article, merci !