La sérialisation d'Exception sur la spark
Je rencontre un problème très étrange sur la Spark sur la sérialisation.
Le code est comme ci-dessous:
class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
def infer(document: RDD[Document]): RDD[DocumentParameter] = {
val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
docs
}
}
où le Document est défini comme:
class Document(val tokens: SparseVector[Int]) extends Serializable
et DocumentParameter est:
class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable
object DocumentParameter extends Serializable
{
def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document,
Array.ofDim[Float](numOfTopics))
}
SparseVectoris une classe sérialisable dans breeze.linalg.SparseVector
.
C'est une carte simple procédure, et toutes les classes sont sérialisables, mais j'ai cette exception:
org.apache.spark.SparkException: Task not serializable
Mais quand j'enlève le numOfTopics
paramètre, c'est:
object DocumentParameter extends Serializable
{
def apply(document: Document) = new DocumentParameter(document,
Array.ofDim[Float](10))
}
et de l'appeler comme ceci:
val docs = documents.map(DocumentParameter.apply)
et cela semble OK.
Est de type Int pas sérialisable? Mais je vois que certains le code est écrit comme ça.
Je ne suis pas sûr de savoir comment corriger ce bug.
#Mis à JOUR#:
Merci @samthebest. Je vais ajouter plus de détails à ce sujet.
stack trace:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.topicmodel.PLSA.infer(PLSA.scala:13)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC.<init>(<console>:41)
at $iwC.<init>(<console>:43)
at <init>(<console>:45)
at .<init>(<console>:49)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 46 more
Comme la trace de la pile donne les informations générales de l'exception, je l'ai enlevé.
J'exécute le code dans l'étincelle de sa coquille.
//suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)
Pourriez-vous me donner quelques tutoriels ou des astuces sur sérialisable?
OriginalL'auteur superhan | 2014-12-20
Vous devez vous connecter pour publier un commentaire.
Fonctions anonymes sérialiser la classe qui les contient. Lorsque vous
map {doc => DocumentParameter(doc, numOfTopics)}
, la seule façon pour elle de donner que de la fonction d'accès ànumOfTopics
est pour sérialiser lesPLSA
classe. Et que la classe ne peut pas être sérialisé, parce que (comme vous pouvez le voir à partir de la stacktrace) il contient lesSparkContext
ce qui n'est pas sérialisable (Mauvaises Choses qui se passerait si les nœuds de cluster individuels ont eu accès au contexte et pourrait par exemple créer de nouveaux emplois dans un mappeur).En général, essayez d'éviter de stocker les
SparkContext
dans vos classes (edit: ou au moins, assurez-vous qu'il est très clair ce genre de classes contiennent lesSparkContext
et de ce genre ne le font pas); c'est mieux de passer comme une (peut-êtreimplicit
) paramètre de méthodes individuelles qui en ont besoin. Sinon, déplacer la fonction{doc => DocumentParameter(doc, numOfTopics)}
dans une classe différente dePLSA
, qui a vraiment un peut être sérialisé.(Comme plusieurs personnes l'ont suggéré, il est possible de garder le
SparkContext
dans la classe, mais marquée comme@transient
afin de ne pas être sérialisé. Je ne recommande pas cette approche; il signifie la classe "comme par magie" changer d'état lorsque sérialisé (perte de laSparkContext
), et donc, vous pourriez vous retrouver avec des Npe lorsque vous essayez d'accéder à laSparkContext
de l'intérieur d'un sérialisé travail. Il est préférable de maintenir une distinction claire entre les classes qui ne sont utilisées que dans le "contrôle" du code (et peut utiliser leSparkContext
) et les classes qui sont sérialisés exécuter sur le cluster (qui ne doit pas avoir laSparkContext
)).@transient
avantval sc : SparkContext
, puisSparkContext
va pas être sérialisé.Je ne suis pas d'accord que vous devriez éviter de stocker les
SparkContext
dans vos classes entièrement (mais upvoted néanmoins). Si vous n'avez pas les stocker dans la portée, alors vous pouvez finir par avoir paramètre de dilatation (ce qui est laid, même lors de l'utilisation implicite params). La seule alternative est de s'en tenir à quelques globale singleton quelque part qui crée des problèmes de son propre (redouté des pointeurs null).OriginalL'auteur lmm
C'est en effet bizarre, mais je crois que je peux deviner le problème. Mais d'abord, vous n'avez pas fourni le strict minimum pour résoudre le problème (je peux le deviner, parce que j'ai vu 100s de ces avant). Ici sont quelques-uns des problèmes avec votre question:
Cette méthode ne retourne pas de
RDD[DocumentParameter]
il retourneUnit
. Vous devez avoir copié et collé un code incorrect.D'autre part, vous n'avez pas fourni la totalité de la trace de la pile? Pourquoi? Il n'y a aucune raison de ne PAS fournir la trace de la pile, et le plein de trace de pile avec message est nécessaire de comprendre l'erreur - une erreur entier pour comprendre ce qu'est l'erreur. Habituellement, un pas sérialisable exception vous dit ce n'est pas sérialisable.
Troisièmement, vous n'avez pas dit nous où la méthode
infer
est, vous faites ça dans un shell? Qu'est-ce que le contenant de l'objet/classe/trait etc deinfer
?De toute façon, je vais deviner qu'en passant dans le
Int
votre provoquant une chaîne de choses pour obtenir sérialisé que vous ne vous attendez pas, je ne peux pas vous donner plus d'informations que jusqu'à ce que vous fournir le minimum de code, donc nous pouvons comprendre votre problème.OriginalL'auteur samthebest