Comment accéder diffusé DataFrame Spark
J'ai créé deux dataframes qui sont de la Ruche tables(PC_ITM et ITEM_SELL) et de grande taille et je suis en utilisant ces
souvent dans la requête SQL en vous inscrivant en tant que tableau.Mais comme ce sont de gros, c'est prendre beaucoup de temps
pour obtenir le résultat de la requête.J'ai donc enregistré comme parquet fichier puis de les lire et enregistrés en tant que table temporaire.Mais encore, je ne suis pas d'obtenir de bonnes performances donc, j'ai diffusé ces données-cadres et alors enregistré comme les tableaux ci-dessous.
PC_ITM_DF=sqlContext.parquetFile("path")
val PC_ITM_BC=sc.broadcast(PC_ITM_DF)
val PC_ITM_DF1=PC_ITM_BC
PC_ITM_DF1.registerAsTempTable("PC_ITM")
ITM_SELL_DF=sqlContext.parquetFile("path")
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF)
val ITM_SELL_DF1=ITM_SELL_BC.value
ITM_SELL_DF1.registerAsTempTable(ITM_SELL)
sqlContext.sql("JOIN Query").show
Mais encore je ne peux pas atteindre la performance elle est prise en même temps que lors de ces blocs de données ne sont pas diffusés.
Si quelqu'un peut dire si c'est la bonne approche de la radiodiffusion et de l'utiliser?`
OriginalL'auteur Raghavendra Kulkarni | 2016-01-21
Vous devez vous connecter pour publier un commentaire.
Vous n'avez pas vraiment besoin de "l'accès" à la diffusion dataframe - vous venez de l'utiliser, et l'Étincelle mettra en œuvre la diffusion sous le capot. Le fonction de radiodiffusion fonctionne très bien, et fait plus de sens que le
sc.broadcast
approche.Il peut être difficile de comprendre où le temps est passé si vous évaluez tout à la fois.
Vous pouvez casser votre code en plusieurs étapes. La clé ici sera de l'exécution d'une action et la persistance de l'dataframes vous souhaitez diffuser avant vous les utilisez dans votre jointure.
Faire cela permettra d'assurer que le dataframe est
Lorsque vous exécutez maintenant
sqlContext.sql("JOIN Query").show
vous devriez maintenant voir une diffusion sur les jointure de hachage' dans l'onglet SQL de votre Étincelle de l'INTERFACE utilisateur.oui, j'en utilise tout le temps. L'avantage est que les données sont entièrement disponibles sur tous les nœuds - il n'est plus distribué, ce qui rend pour une meilleure performance lors de l'adhésion. Prenons l'exemple d'un DataFrame contenant chaque personne aux etats-unis et leur code postal, puis un tableau contenant le code zip -> etat. L'adhésion à ces nécessite des quantités massives de brassage. Diffusion le relativement petit zip->état dataframe à tous les nœuds supprime le besoin de mélanger.
Vous êtes à la diffusion de l'dataframe qui est conservé dans la mémoire, qui n'est pas distribué. Est-ce exact? Spark suggère d'ajouter un outil de partitionnement de vos données afin de réduire le montant de lecture aléatoire lors de l'adhésion. @kirkbroadhurt
Je dis Étincelle qui le dataframe doit être diffusée. Il reste "distribués" jusqu'à ce qu'il est nécessaire (par exemple pour une jointure), au moment où l'Étincelle de l'optimiseur de Catalyseur comprend que je veux envoyer une copie de la dataframe à chaque nœud.
OK, j'ai mal compris. C'est sous la condition où le dataframe s'inscrit dans la mémoire. Dans votre exemple de zipcodes, qui fonctionne très bien. Dans le cas d'un système distribué dataframe plus grande que la mémoire, la diffusion ne semble pas être la bonne approche. Dans ce cas, une combinaison de la persistance et le partitionnement des données serait une solution qui fonctionne avec n'importe quelle taille de données.
OriginalL'auteur Kirk Broadhurst
Je cache les rdd dans la mémoire. La prochaine fois qu'ils sont nécessaires, étincelle va lire le CA de la mémoire plutôt que de générer du CA à partir de zéro à chaque fois. Voici un lien vers le guide de démarrage rapide docs.
rdd.cache() est une abréviation pour
rdd.persist(StorageLevel.MEMORY_ONLY)
. Il y a quelques niveaux de persistance, vous pouvez choisir d'emballer vos données est trop grande pour la mémoire de la persévérance. Voici une liste de persistance des options. Si vous souhaitez supprimer manuellement les RDD à partir du cache, vous pouvez appelerrdd.unpersist()
.Si vous préférez la diffusion des données. Vous devez d'abord recueillir sur le pilote avant de le diffuser. Cela nécessite que votre RDD s'inscrit dans la mémoire de votre pilote (et des exécuteurs).
Il déclare que les données est grande et est fréquemment utilisé
légèrement s'égarer de la question d'origine:
RDD fits in memory
donc cela veux dire que je peux ne pas diffuser de données jusqu'à ce que je puisse le récupérer dans la mémoire principale du pilote? En général, je utiliser mon propre ordinateur portable en tant que pilote et maîtres/esclaves sur un cluster de grande taille. Donc, c'est une limitation que je peut faire face à bientôt?Selon @KirkBroadhurst vous pouvez diffuser de la RDD et les exécuteurs testamentaires de collecter les données lorsque cela est nécessaire.
OriginalL'auteur Alex Naspo
En ce moment, vous ne pouvez pas accéder diffusé bloc de données dans la requête SQL. Vous pouvez utiliser brocasted bloc de données seulement travers des trames de données.
Consulter: https://issues.apache.org/jira/browse/SPARK-16475
broadcast
fonction d'une table temporaire, puis appelez la table temporaire à l'intérieur d'une requête SQL.OriginalL'auteur Raju Bairishetti