Comment définir et utiliser un Définis par l'Utilisateur Fonction d'Agrégation dans Spark SQL?
Je sais comment écrire un fichier UDF Spark SQL:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Puis-je faire quelque chose de similaire pour définir une fonction d'agrégation? Comment est-ce fait?
Pour le contexte, je veux exécuter la requête SQL suivante:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Il doit retourner quelque chose comme
Row(span1, false, T0)
Je veux que la fonction d'agrégation de me dire si il y a des valeurs pour opticalReceivePower
dans les groupes définis par span
et timestamp
qui sont en dessous du seuil. Dois-je écrire mon udaf, nous différemment à l'UDF, j'ai collé ci-dessus?
- Connexes: stackoverflow.com/questions/33899977/...
- Peut-être que l'utilisation
reduceByKey
/foldByKey
que > recommandé par zero323
Vous devez vous connecter pour publier un commentaire.
Méthodes prises en charge
Spark >= 2.3
Vectorisé udf (Python uniquement):
Exemple d'utilisation:
Voir aussi L'application de fonctions définies par l'utilisateur sur GroupedData dans PySpark (avec le fonctionnement de python exemple)
Spark >= 2.0 (en option 1.6 mais légèrement différentes API):
Il est possible d'utiliser
Aggregators
sur typéDatasets
:Spark >= 1.5:
Spark 1.5 vous pouvez créer des udaf, nous aime, même s'il est plus probable une overkill:
Exemple d'utilisation:
Étincelle 1.4 solution de contournement:
Je ne sais pas si j'ai correctement comprendre vos besoins, mais aussi loin que je peux dire de la plaine de vieux agrégation doit être assez ici:
Spark <= 1.4:
Autant je sais, en ce moment (Étincelle 1.4.1), il n'y a pas de support pour les udaf, nous, autres que la Ruche ceux. Il devrait être possible avec Spark 1.5 (voir SPARK-3947).
Non pris en charge /méthodes internes
En interne Étincelle utilise un certain nombre de classes, y compris
ImperativeAggregates
etDeclarativeAggregates
.Il y a de prévu pour une utilisation interne, et peuvent changer sans préavis, de sorte qu'il n'est probablement pas quelque chose que vous souhaitez utiliser dans votre code de production, mais juste pour être complet,
BelowThreshold
avecDeclarativeAggregate
pourraient être mises en œuvre comme ceci (testé avec Spark 2.2-SNAPSHOT):Il doit être encore enveloppé d'un équivalent de
withAggregateFunction
.Aggregator
fonctionne avec les deuxgroupBy
etgroupByKey
(voir github.com/apache/spark/blob/master/sql/core/src/test/scala/org/...). Malheureusement,Aggregator
ne fonctionne pas sur windows où vous devez utiliser laUserDefinedAggregateFunction
.