Calculer les moyennes pour chaque CLÉ par Paires (K,V) RDD dans Spark avec Python
Je veux partager ce particulier Apache Spark avec Python solution parce que la documentation est assez pauvre.
Je voulais calculer la valeur moyenne de K/V paires (stockées dans des Paires RDD), par CLÉ. Voici ce que les données de l'échantillon ressemble à:
>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
Maintenant la séquence de code suivante est un moins optimale façon de le faire, mais il fonctionne. C'est ce que je faisais avant j'ai trouvé une meilleure solution. Il n'est pas terrible mais-comme vous le verrez dans la section réponse-il y a une plus concis, efficace.
>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]
Vous devez vous connecter pour publier un commentaire.
Maintenant une bien meilleure façon de le faire est d'utiliser la
rdd.aggregateByKey()
méthode. Parce que cette méthode est très peu documentée dans la Apache Spark avec la documentation Python -- et c'est pourquoi j'ai écrit ce Q&Un -- jusqu'à récemment, j'avais été en utilisant le code ci-dessus de la séquence. Mais encore une fois, c'est moins efficace, donc éviter faire de cette façon, sauf si nécessaire.Voici comment faire la même chose à l'aide de la
rdd.aggregateByKey()
méthode (recommandé) ...Par la CLÉ, de calculer simultanément la SOMME (le numérateur pour la moyenne que nous voulons calculer), et le COMTE (le dénominateur de la moyenne que nous voulons calculer):
Où il est vrai à propos de la signification de chaque
a
etb
paire ci-dessus (de sorte que vous pouvez visualiser ce qui se passe):Enfin, calculer la moyenne pour chaque CLÉ, et de recueillir les résultats.
J'espère que cette question et la réponse avec
aggregateByKey()
aidera.À mon avis un plus lisible l'équivalent d'un aggregateByKey avec deux lambdas est:
De cette manière, l'ensemble du moyen de calcul serait:
Juste ajouter une remarque à propos d'une interface intuitive et plus court (mais une mauvaise) solution à ce problème. Le livre Sam's Teach Yourself Apache Spark dans les 24 Heures a expliqué ce problème bien dans le dernier chapitre.
À l'aide de
groupByKey
on peut facilement résoudre le problème comme ceci:De sortie:
C'est intuitif et attrayant, mais ne l'utilisez pas!
groupByKey
n'est pas n'importe quelle combinaison sur les mappeurs et apporte toutes les paires clé-valeur pour le réducteur.Éviter
groupByKey
autant que possible. Aller avec lereduceByKey
solution comme @pat.Une légère amélioration de la réponse de prismalytics.io.
Il pourrait y avoir des cas où le calcul de la somme pourrait dépassement de nombre, parce que nous sommes en additionnant grand nombre de valeurs. Nous avons pu, au lieu de garder les valeurs moyennes et de garder le calcul de la moyenne de la moyenne et des comtes de deux pièces se réduit.
Si vous avez deux parties moyenne et compte comme (a1, c1) et (a2, c2), la moyenne générale est:
total/compte = (total1 + total2)/(count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)
Si nous marque R = c2/c1, Il peut être ré-écrit plus loin que a1/(1+R) + a2*R/(1+R)
Si l'on marque Ri comme 1/(1+R), nous pouvons écrire que a1*Ri + a2*R*Ri
Cette approche peut être converti pour une clé-valeur en utilisant simplement mapValues au lieu de la carte et reduceByKey au lieu de les réduire.
C'est à partir de: https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2