SPARK DataFrame: la Façon la plus efficace de split dataframe pour chaque groupe basé sur la même colonne, les valeurs
J'ai un DataFrame générée comme suit:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
Les résultats ressemblent à:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
Je voudrais faire de nouvelles dataframes en fonction de chaque valeur unique de col("Hour")
, c'est à dire
- pour le groupe de l'Heure==0
- pour le groupe de l'Heure==1
- pour le groupe de l'Heure==2
et ainsi de suite...
De sorte que la sortie désirée serait:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
+----+--------+----------+
df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
+----+--------+----------+
et de la même façon,
df2 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
+----+--------+----------+
Toute aide est très appréciée.
EDIT 1:
Ce que j'ai essayé:
df.foreach(
row => splitHour(row)
)
def splitHour(row: Row) ={
val Hour=row.getAs[Long]("Hour")
val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))
val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")
val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))
mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
}
PROBLÈME AVEC CETTE STRATÉGIE:
Il a pris 8 heures lorsqu'il a été exécuté sur un dataframe df
qui avait plus de 1 million de lignes et de susciter le travail a été donné autour de 10 GO de RAM sur un seul nœud. Donc, join
s'avère être très efficace.
Mise en garde: je dois écrire chaque dataframe mydf
que le parquet a imbriqués schéma qui doit être maintenu (non aplati).
- Pourriez-vous faire
df.write.partitionBy("hour").saveAsTable("myparquet")
pour ce faire? - Merci, cela a fonctionné 60 fois plus rapide que ma stratégie! Mais il sauve la résultante de fichiers avec des noms comme
hour=0
,hour=1
,etc et je veux les fichiers à enregistrer en tant que0
,1
,etc. Pourriez-vous veuillez donner vos idées sur la façon de réaliser cet objectif? - Vous pouvez être en mesure d'utiliser hiveContext avec la configuration avec
hive.dynamic.partitioning.custom.pattern
mais l'un des avantages de le garder commehour=0
,hour=1
, etc. c'est que lorsque vous êtes en cours d'exécutionspark.read.parquet(...)
il sera automatiquement comprendre la dynamique sous-jacente des partitions. Une autre approche possible serait de renommer les dossiers à la suite (c'est à dire utilisermv
de commande), mais vous pouvez toujours exécuter le même problème queread.parquet
ne sera pas automatiquement de comprendre la dynamique des partitions.
Vous devez vous connecter pour publier un commentaire.
Comme indiqué dans mes commentaires, un potentiellement facile d'approche à ce problème serait d'utiliser:
Comme indiqué, la structure de dossier serait
myparquet/hour=1
,myparquet/hour=2
, ...,myparquet/hour=24
par opposition àmyparquet/1
,myparquet/2
, ...,myparquet/24
.À changer la structure des dossiers, vous pouvez
hcat.dynamic.partitioning.custom.pattern
au sein d'une explicite HiveContext; plus d'informations sur HCatalog DynamicPartitions.df.write.partitionBy.saveAsTable(...)
de commande avec quelque chose commefor f in *; do mv $f ${f/${f:0:5}/} ; done
qui supprimerait laHour=
texte à partir du nom du dossier.Il est important de noter que, en changeant le modèle de nommage pour les dossiers, lorsque vous exécutez
spark.read.parquet(...)
dans ce dossier, l'Étincelle ne sera pas automatiquement de comprendre la dynamique des partitions depuis sa manque le partitionKey (c'est à direHour
) de l'information.Ce qui a été répondu ici pour Spark (Scala):
Comment puis-je partager un dataframe en dataframes avec les mêmes valeurs des colonnes de la SCALA et de la bougie
et ici pour pyspark:
PySpark - Split/Filtre DataFrame par des valeurs de colonne
val byStateArray: Array[Any] = cnts.map(c => sorted.where($"cnt" <=> c))
2 problèmes: i) Elle retourne un Tableau[Tout] à l'encontre souhaité dataframe. ii) Il affiche l'erreur:CANNOT RESOLVE <=>