Spark: Comment traduire count(distinct(valeur)) dans Dataframe de l'API
Je suis en train de comparer les différentes façons d'agréger mes données.
C'est mes données d'entrée avec 2 éléments de la page,le visiteur):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
De travail avec une commande SQL dans Spark SQL avec ce code:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
J'obtiens ce résultat:
(PAG1,3) //PAG1 has been visited by 3 different visitors
(PAG2,2) //PAG2 has been visited by 2 different visitors
Maintenant, je voudrais obtenir le même résultat à l'aide Dataframes et thiers API, mais je ne peux pas obtenir le même résultat:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
En fait, c'est ce que j'obtiens en sortie:
[PAG1,8] //just the simple page count for every page
[PAG2,4]
C'est probablement quelque chose de stupide, mais je ne peux pas le voir en ce moment.
Merci d'avance!
FF
Vous devez vous connecter pour publier un commentaire.
Ce que vous avez besoin est le DataFrame fonction d'agrégation
countDistinct
:org.apache.spark.sql.functions
, de les importer :), éditer fait.org.apache.spark.sql.functions._
agg()
, voir modifierfrom pyspark.sql.functions import *
Vous pouvez utiliser dataframe de
groupBy
commande deux fois pour le faire. Ici,df1
est votre entrée.Cette commande produit le résultat suivant:
Puis utilisez le
groupBy
à nouveau la commande pour obtenir le résultat final.Final de sortie: