Lorsque les accumulateurs sont vraiment fiables?
Je veux utiliser un accumulateur à recueillir des statistiques sur les données, je suis la manipulation sur une Étincelle d'emploi. Idéalement, je voudrais faire tout le travail calcule les transformations nécessaires, mais depuis Étincelle re-calculer les tâches dans les différents cas, les accumulateurs ne rendrait pas compte de vraies mesures. Voici comment la documentation décrit ceci:
Pour accumulateur mises à jour effectuées à l'intérieur des actions seulement, Étincelle
garantit que chaque tâche de mise à jour de l'accumulateur ne seront
appliquer une fois, c'est à dire redémarré tâches ne seront pas de mise à jour de la valeur. Dans
les transformations, les utilisateurs doivent être conscients que chaque tâche de mise à jour peut
être appliqué plus d'une fois si les tâches ou étapes sont ré-exécuté.
Ceci est source de confusion puisque la plupart des actions ne pas permettre l'exécution de code personnalisé (où les accumulateurs peuvent être utilisés), ils ont surtout prendre les résultats de précédentes transformations (paresseusement). La documentation indique également ceci:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
//Here, acc is still 0 because no actions have cause the `map` to be computed.
Mais si l'on ajoute data.count()
à la fin, serait-ce la garantie d'être correct (ont pas de doublons) ou pas? Clairement acc
n'est pas utilisée "à l'intérieur que des actions", que la carte est une transformation. Donc, il ne devrait pas être garanti.
D'autre part, la discussion sur les tickets Jira parler de "résultat des tâches" plutôt que des "actions". Par exemple ici et ici. Cela semble indiquer que le résultat serait en effet garanti pour être correct, puisque nous sommes à l'aide de acc
immédiatement avant et à l'action, et devrait donc être calculée comme une seule étape.
Je suppose que cette notion de "résultat de la tâche de" a à voir avec le type d'exploitation concerné, étant le dernier, qui comprend une action, comme dans cet exemple qui montre comment plusieurs opérations sont divisés en étapes (en magenta, image prise à partir de ici):
Donc, hypothétiquement, une count()
action à la fin de cette chaîne serait partie de la même étape finale, et je voudrais être garanti que les accumulateurs utilisés sur la dernière carte n'inclut aucune des doublons?
Clarification autour de cette question serait génial! Merci.
- Eh bien, bounty période terminée et je n'ai toujours pas vraiment connaître la vraie réponse, de sorte que l'attribution à la plus commentée de réponse à ce jour :-S
- les données.le comte ne s'exécutera pas de données.carte(...) mais cela va faire >>>val data2 = données.carte(x => acc += x; f(x)) >>>données2.count()
Vous devez vous connecter pour publier un commentaire.
Pour répondre à la question "Quand les accumulateurs vraiment fiable ?"
Réponse : Quand ils sont présents dans une Action fonctionnement.
Que par la documentation dans l'Action de la Tâche, même si tout redémarré tâches sont présents, il fera la mise à jour de l'Accumulateur qu'une seule fois.
Et d'Action permettent d'exécuter du code personnalisé.
Pour Ex.
Mais, Pourquoi Carte+Action viz. Résultat de la Tâche opérations sont pas fiable pour un Accumulateur de l'opération?
Verdict : Manipulé Correctement
Verdict : ce n'est Pas manipulé dans le Résultat de la Tâche.Accumulateur donnera mauvaise sortie.
Verdict : ce n'est Pas manipulé.Accumulateur donnera mauvaise sortie.
Verdict : ce n'est Pas manipulé.Accumulateur donnera mauvaise sortie.
Il peut donc arriver même fonction peut exécuter plusieurs fois sur les mêmes données.Si l'Étincelle ne fournit aucune garantie pour accumulateur obtenir des mises à jour en raison de la manipulation de la Carte.
Il est donc préférable d'utiliser l'Accumulateur dans l'Action de l'opération dans Spark.
Pour en savoir plus sur l'Accumulateur et de ses problèmes de renvoyer cette Post De Blog Par Imran Rashid.
sc.readFile(...).map(...<accumulator use here> ...).saveAsFile(...)
chaque tâche sera calculée qu'une seule fois et les accumulateurs seront fiables, comme l'ensemble de l'opération va se passer comme une seule unité en vertu de l'évaluation différée, sans aucun intermédiaire des résultats qui pourraient être rediffusion (dans une optique de spéculation ou pas). Mon expérience de refléter cette mesure, c'est pourquoi je cherchais une réponse faisant autorité.val a = sc.readFile(...); val b = a.map(...<accumulator used here>...);b.saveAsTextFile(...);val c = b.map(....);println(c.count());
.Maintenant, si vous vérifiez la valeur du double de l'accumulateur original de la valeur.Tant que cette carte est utilisée deux fois pour faire l'opération. Et si vous avez utilisé de l'accumulateur dans l'action.Ensuite, vous serez sûr qu'il sera appelé une fois.Et je ne prends pas le problème de cache ici comme si RDD est expulsé.Il en reprise de la carte à nouveau.save
etcount
.Accumulateur mises à jour sont envoyées vers le conducteur lorsqu'une tâche est terminée avec succès. Si votre accumulateur résultats sont garantis pour être correcte lorsque vous êtes certain que chaque tâche sera exécutée une seule fois et chaque tâche fait comme vous l'aviez prévu.
Je préfère compter sur
reduce
etaggregate
au lieu d'accumulateurs, car il est assez difficile d'énumérer toutes les façons, les tâches peuvent être exécutées.Cela dit, il y a beaucoup de cas simples où les accumulateurs peuvent être entièrement fiable.
Oui, si spéculative exécution est désactivé. Le
map
et lacount
sera d'une seule étape, donc, comme vous le dites, il n'existe aucun moyen d'une tâche peut être exécutée avec succès plus d'une fois.Mais un accumulateur est mis à jour comme un effet secondaire. Donc, vous devez être très prudent lorsque vous pensez à la façon dont le code sera exécuté. Considérez ceci au lieu de
accumulating.count
:Cela permettra également de créer une tâche pour chaque partition, et chaque tâche sera garanti pour exécuter exactement une fois. Mais le code
map
ne sera exécutée sur tous les éléments, seulement le premier dans chaque partition.L'accumulateur est comme une variable globale. Si vous partagez une référence à la RDD qui peut incrémenter la valeur de l'accumulateur, puis un autre code (les autres threads), peuvent être la cause d'un accroissement de trop.
Je pense que Matei répondu à la visée de la documentation: