Modifier la collection dans un Spark RDD foreach
Je suis en train d'ajouter des éléments à une carte lors de l'itération les éléments d'un EDR. Je ne reçois pas toutes les erreurs, mais les modifications ne sont pas passe.
Tout fonctionne bien de l'ajouter directement ou itération autres collections:
scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()
scala> myMap("test1")="test1"
scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)
scala> List("test2", "test3").foreach(w => myMap(w) = w)
scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
Mais quand j'essaie de faire la même chose à partir d'un RDD:
scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)
scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
J'ai essayé de l'impression du contenu de la carte, comme elle l'était avant le foreach assurez-vous que la variable est la même, et il s'imprime correctement:
fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...
J'ai aussi imprimé la modification de l'élément de la carte à l'intérieur de la boucle foreach code et de l'impression que modifiée, mais lorsque l'opération est terminée, la carte semble inchangé.
scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
La conversion de la RDD dans un tableau (recueillir) fonctionne aussi très bien:
fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)
Est-ce un contexte problème? Suis-je accéder à une copie des données qui est en cours de modification quelque part d'autre?
source d'informationauteur palako
Vous devez vous connecter pour publier un commentaire.
Il devient plus clair lors de l'exécution sur une Étincelle de cluster (pas une seule machine). Le RDD est maintenant réparti sur plusieurs machines. Lorsque vous appelez
foreach
vous dites à chaque machine de quoi faire avec le morceau de la RDD. Si vous vous référez à toutes les variables locales (commemyMap
), ils se sont sérialisés et envoyé à la machine, de sorte qu'ils peuvent l'utiliser. Mais rien ne revient. Si votre copie originale demyMap
est pas affectée.Je pense que cela répond à votre question, mais il est évident que vous essayez d'accomplir quelque chose et vous ne serez pas en mesure d'y arriver de cette façon. N'hésitez pas à expliquer ici ou dans une autre question de ce que vous essayez de faire, et je vais essayer de l'aider.