SPARK DataFrame: la Façon la plus efficace de split dataframe pour chaque groupe basé sur la même colonne, les valeurs

J'ai un DataFrame générée comme suit:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value").alias("TotalValue"))
  .sort($"Hour".asc,$"TotalValue".desc))

Les résultats ressemblent à:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+

Je voudrais faire de nouvelles dataframes en fonction de chaque valeur unique de col("Hour") , c'est à dire

  • pour le groupe de l'Heure==0
  • pour le groupe de l'Heure==1
  • pour le groupe de l'Heure==2
    et ainsi de suite...

De sorte que la sortie désirée serait:

df0 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
+----+--------+----------+

df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
+----+--------+----------+

et de la même façon,

df2 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
+----+--------+----------+

Toute aide est très appréciée.

EDIT 1:

Ce que j'ai essayé:

df.foreach(
  row => splitHour(row)
  )

def splitHour(row: Row) ={
    val Hour=row.getAs[Long]("Hour")

    val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))

    val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")

    val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))

    mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
  }

PROBLÈME AVEC CETTE STRATÉGIE:

Il a pris 8 heures lorsqu'il a été exécuté sur un dataframe df qui avait plus de 1 million de lignes et de susciter le travail a été donné autour de 10 GO de RAM sur un seul nœud. Donc, join s'avère être très efficace.

Mise en garde: je dois écrire chaque dataframe mydf que le parquet a imbriqués schéma qui doit être maintenu (non aplati).

  • Pourriez-vous faire df.write.partitionBy("hour").saveAsTable("myparquet") pour ce faire?
  • Merci, cela a fonctionné 60 fois plus rapide que ma stratégie! Mais il sauve la résultante de fichiers avec des noms comme hour=0, hour=1,etc et je veux les fichiers à enregistrer en tant que 0,1,etc. Pourriez-vous veuillez donner vos idées sur la façon de réaliser cet objectif?
  • Vous pouvez être en mesure d'utiliser hiveContext avec la configuration avec hive.dynamic.partitioning.custom.pattern mais l'un des avantages de le garder comme hour=0, hour=1, etc. c'est que lorsque vous êtes en cours d'exécution spark.read.parquet(...) il sera automatiquement comprendre la dynamique sous-jacente des partitions. Une autre approche possible serait de renommer les dossiers à la suite (c'est à dire utiliser mv de commande), mais vous pouvez toujours exécuter le même problème que read.parquet ne sera pas automatiquement de comprendre la dynamique des partitions.