Spark DataFrame: ne groupBy après orderBy maintenir l'ordre?
J'ai une Spark 2.0 dataframe example
avec la structure suivante:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
Il contient 24 entrées pour chaque id (une pour chaque heure de la journée) et est commandé par id, heure à l'aide de la orderBy fonction.
J'ai créé un Agrégateur groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
Il me permet de concaténer des colonnes dans les chaînes pour obtenir cette finale dataframe:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
Ma question est, si je ne example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
, ne fait que garantir que l'horaire de compte sont classés correctement dans leurs seaux?
J'ai lu que ce n'est pas forcément le cas de Rdd (voir Étincelle de tri par clé, puis en groupe pour faire commandés itératif?), mais peut-être que c'est différent pour DataFrames ?
Si non, comment puis-je contourner cela ?
OriginalL'auteur Ana Todor | 2016-09-15
Vous devez vous connecter pour publier un commentaire.
groupBy après orderBy ne pas maintenir l'ordre, comme d'autres l'ont souligné. Ce que vous voulez faire est d'utiliser une fonction de Fenêtre--partition sur l'id et l'ordre par heures. Vous pouvez collect_list plus de cela, puis prendre le max (la plus grande) de la liste finale, car ils passent de façon cumulative (c'est à dire de la première heure ne se dans la liste, la deuxième heure aura 2 éléments dans la liste, et ainsi de suite).
Exemple de code complet:
Ce qui nous garde dans le DataFrame monde. J'ai aussi simplifié l'UDF code de l'OP était de l'utiliser.
De sortie:
OriginalL'auteur Adair
J'ai un cas où l'ordre n'est pas toujours tenue: parfois oui, surtout pas.
Mon dataframe a 200 partitions en cours d'exécution sur l'Étincelle de 1,6
pour vérifier la commande je compare les valeurs de retour de
et
donnant par exemple (à gauche: sort_array(collect_list()); à droite: collect_list())
La colonne de gauche est toujours triée, tandis que la colonne de droite ne se compose que de tri de blocs.
Pour les différentes exécutions de prendre(), l'ordre des blocs dans la colonne de droite est différente.
orderBy(times, group_key).groupBy(group_key)
. Avez-vous essayé?OriginalL'auteur Kat
Si vous voulez un travail autour de la mise en œuvre en Java (Scala et Python doit être similaire):
OriginalL'auteur Shyam
ordre peut ou peut ne pas être la même, en fonction du nombre de partitions et de la distribution des données. Nous pouvons résoudre en utilisant des rdd.
Par exemple::
J'ai sauvé la ci-dessous des exemples de données dans un fichier et l'a chargé dans hdfs.
et exécuté la commande ci-dessous:
de sortie:
Qui est, nous avons regroupé les données par type, par la suite, triées par prix, et la concaténation de l'ids par "~" comme séparateur.
La commande ci-dessus peut être brisé comme ci-dessous:
on peut alors prendre un groupe en particulier par l'aide de la commande
de sortie:
OriginalL'auteur Ashish
La réponse courte est Oui, le salaire horaire de compte, permettra de maintenir le même ordre.
De généraliser, il est important que vous sorte avant de vous groupe. Aussi le tri doit être le même que le groupe + la colonne pour laquelle vous voulez vraiment le tri.
Un exemple serait:
Je n'ai pas de docs officielles, mais j'ai cette article qui explique un peu mieux le mécanisme de bzhangusc.wordpress.com/2015/05/28/... .Les commentaires sont intéressants aussi.
Fait intéressant, même Sean Owen lui-même déclare que les commandes peuvent ne pas être conservés (issues.apache.org/jira/browse/...)
Quelqu'un a lu l'article et les commentaires, j'ai ajouté le 7 juin 2017?
OriginalL'auteur Interfector