Dans Apache Spark, pourquoi ne RDD.de l'union de ne pas préserver l'outil de partitionnement?
Comme tout le monde le sait partitioners Spark ont un énorme impact sur les performances sur un "large" des opérations, de sorte qu'il est généralement personnalisé dans les opérations. J'ai essayé avec le code suivant:
val rdd1 =
sc.parallelize(1 to 50).keyBy(_ % 10)
.partitionBy(new HashPartitioner(10))
val rdd2 =
sc.parallelize(200 to 230).keyBy(_ % 13)
val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)
val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)
Je vois que, par défaut, cogroup()
donne toujours un EDR avec l'outil de partitionnement personnalisé, mais union()
ne le fait pas, il sera toujours en revenir à la valeur par défaut. C'est contre-intuitif que nous avons l'habitude de supposer qu'un PairRDD devrait user de son premier élément clé de partition. Est-il un moyen de "forcer" l'Étincelle de fusionner 2 PairRDDs à utiliser la même clé de partition?
Vous devez vous connecter pour publier un commentaire.
union
est très efficace, car il ne bouge pas toutes les données. Sirdd1
a 10 partitions etrdd2
a 20 partitions puisrdd1.union(rdd2)
aura 30 partitions: les partitions des deux RDDs mis les uns après les autres. C'est juste une tenue de la comptabilité de changement, il n'y a pas de shuffle.Mais forcément, il ignore l'outil de partitionnement. Un outil de partitionnement est construit pour un nombre donné de partitions. Le résultant CA a un certain nombre de partitions qui est différent des deux
rdd1
etrdd2
.Après la prise de l'union, vous pouvez exécuter
repartition
aléatoire les données, organisation des données par clé.Il existe une exception à ce qui précède. Si
rdd1
etrdd2
ont le même outil de partitionnement (avec le même nombre de partitions),union
se comporte différemment. Il rejoindra les partitions des deux Rdd par paires, en lui donnant le même nombre de partitions que chacune des entrées avaient. Cela peut nécessiter de déplacer les données (si les partitions ne sont pas co-situé), mais n'impliquera pas la lecture aléatoire. Dans ce cas, l'outil de partitionnement est conservé. (Le code est dans PartitionerAwareUnionRDD.scala.)df.partitioner
etdf.partitions
pour voir ce qui se passe.df.rdd.partitioner
etdf.rdd.getNumPartitions
. N'ai aucune idée de pourquoi mon DFs n'ont pas d'outil de partitionnement (Aucun) même quand je suis repartitionnement eux?union
peut être complètement différente pour eux que pour les Rdd. Désolé pour vous induire en erreur.oneRowDF
n'a en fait qu'une ligne, peut-être que vous pourriez essayer de recueillir tous les de local et de la construction d'une taille raisonnable DF d'eux avant d'aller de l'union. (Je n'ai pas essayé.)Ce n'est plus vrai. Iff deux Rdd ont exactement le même programme de partitionnement et le nombre de partitions, le
union
ed CA va aussi avoir les mêmes partitions. Cela a été introduit dans https://github.com/apache/spark/pull/4629 et incorporé dans Spark 1.3.