Spark SQL diffusion de jointure de hachage
Je suis en train de réaliser une émission de jointure de hachage sur dataframes à l'aide de SparkSQL, comme indiqué ici: https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05%20BroadcastHashJoin%20-%20scala.html
Dans cet exemple, la (petite) DataFrame
est conservé par saveAsTable et puis, il y a une jointure via spark SQL (via sqlContext.sql("..."))
Le problème que j'ai c'est que j'ai besoin d'utiliser la sparkSQL API pour construire mes SQL (je suis de gauche se joindre à ~50 tables avec une liste d'ID, et ne veux pas écrire du SQL à la main).
How do I tell spark to use the broadcast hash join via the API? The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.
Vous devez vous connecter pour publier un commentaire.
Vous pouvez marquer explicitement la
DataFrame
comme suffisamment petit pour la radiodiffusionà l'aide de
broadcast
fonction:Python:
ou de diffusion de l'indice (Spark >= 2.2):
Scala:
ou de diffusion de l'indice (Spark >= 2.2):
SQL
Vous pouvez utiliser les indices (Spark >= 2.2):
ou
ou
R (SparkR):
Avec
hint
(Spark >= 2.2):Avec
broadcast
(Spark >= 2.3)Note:
Diffusion de jointure est utile si l'une des structures est relativement faible. Sinon, il peut être beaucoup plus cher qu'un plein shuffle.
smallDF.join(largeDF)
ne pas faire une émission de jointure de hachage, maislargeDF.join(smallDF)
n'.spark.sql.autoBroadcastJoinThreshold
etbroadcast
fonction peut être appliquée sur n'importe quelle position:broadcast(largeDF).join(smallDF, Seq("foo"))
shuffledHashJoin :
toutes les données pour l'Inde sera mélangée dans seulement 29 touches pour chacun des états.
Problèmes:
inégale de la fragmentation.
Limitée parallélisme avec 29 sortie des partitions.
broadcaseHashJoin:
diffusion de la petite RDD à tous les nœuds du travailleur.
le parallélisme de la grande rdd est toujours maintenu et shuffle n'est même pas
nécessaire.
PS: l'Image peut laid, mais instructif.
Avec une diffusion joindre un côté de la jointure équation est matérialisé et de l'envoyer à tous les utilisateurs. Il est donc considéré comme une carte-jointure côté.
Que l'ensemble de données est prise en matérialisée et de les envoyer sur le réseau, il ne fait qu'apporter une amélioration significative des performances, si considérables petit.
Donc, si vous essayez d'effectuer smallDF.join(largeDF)
Attendre..!!! une autre contrainte est qu'il faut également tenir entièrement dans la mémoire de chaque interprète.Il doit également s'insérer dans la mémoire du Pilote!
De diffusion variables sont partagées entre les exécuteurs en utilisant le protocole Torrent je.e.Peer-to-Peer protocole et l'avantage du protocole Torrent est que les pairs, le partage de blocs d'un fichier parmi les uns des autres en ne s'appuyant pas sur une entité centrale chargée de la tenue de tous les blocs.
Exemple mentionné ci-dessus est suffisante pour commencer à jouer de la diffusion de la rejoindre.
Remarque:
Ne peut pas modifier la valeur après la création.
Si vous essayez, le changement va être sur une seule&nœud