Spark: prendre la N par clé
Dire que j'ai un PairRDD en tant que tel (Évidemment beaucoup plus de données dans la vraie vie, assumer des millions d'enregistrements):
val scores = sc.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2)
))
Quel est le moyen le plus efficace pour générer un CA avec les 2 premiers scores par clé?
val top2ByKey = ...
res3: Array[(String, Int)] = Array((a,4), (a,3), (b,4), (b,3))
OriginalL'auteur michael_erasmus | 2015-05-11
Vous devez vous connecter pour publier un commentaire.
Je pense que cela devrait être tout à fait efficace:
Modifié selon l'OP commentaires:
J'ai eu ce travail en adaptant user52045 réponse: val scores = sc.paralléliser(Array( ("a", 1), ("un", 2), ("a", 3), (b, 3), (b, 1), ("un", 4), ("b", 4), (b, 2) )) scores.mapValues(p => (p, p)).reduceByKey((u, v) => { val valeurs = Liste(u._1, u._2, v. _1, v. _2).triés(Commande[Int].inverse).distinctes (les valeurs(0), les valeurs(1)) }).collect()
Vous avez raison, il y a un bug dans mon code. Thx pour le fixer. Une chose que vous devez être prudent, car si tous les éléments de la liste sont les mêmes u obtiendrez outOfBoudException.
cette solution efficace pour les grands ensembles de données? Je veux dire: le tri tout pour obtenir seulement quelques éléments de semble trop épuisant
Vous pouvez obtenir nombre arbitraire d'éléments avec de légères modifications seulement. Remplacer la première carte avec
mapValues(p => Range(0, n).map(_ => p))
et d'utilisation n élément de la séquence au lieu de deux élément d'un tuple.OriginalL'auteur abalcerek
Depuis la version 1.4, il y a un moyen intégré pour ce faire à l'aide de MLLib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
OriginalL'auteur jbochi
Légèrement modifié vos données d'entrée.
Je vais expliquer comment le faire étape par étape:
1.Groupe par clé pour créer la matrice
Résultat:
Comme vous le voyez, chaque valeur elle-même est un tableau de nombres. CompactBuffer est juste optimisé tableau.
2.Pour chaque clé, à l'inverse de tri de la liste des numéros de cette valeur contient
Résultat:
3.Garder seulement les 2 premiers éléments de la 2e étape, ils vont être les 2 meilleurs scores de la liste
Résultat:
4.Plat à la carte pour créer des nouvelle paire de RDD pour chaque clé et haut score
Résultat:
5.Étape facultative - tri par clé si vous voulez
Résultat:
Espoir, cette explication a aidé à comprendre la logique.
OriginalL'auteur Ramin Orujov
Je crois que les scores.reduceByKey(_ + _) serait l'effondrement de toutes les paires avec la même clé, de sorte que vous vous retrouvez avec un seul (a, N) et (b, M) où N et M sont la somme de toutes les valeurs et les valeurs de b, respectivement. À ce stade, vous un seul (a, N), et aucun montant de la décomposition serait de retour (a, i) et (a, j) où i et j sont les deux valeurs les plus élevées pour tous les couples.
OriginalL'auteur Ning Guo