À l'aide de la scala de vidage résultat traitées par une Étincelle dans HDFS
Je suis un peu confus pour trouver la bonne façon de sauvegarder des données dans HDFS après le traitement avec de l'étincelle.
C'est ce que je suis en train de faire. Je suis calcul du minimum, maximum et écart-type de champs numériques. Mes fichiers d'entrée ont des millions de lignes, mais la sortie a seulement environ 15-20 champs. Donc, la sortie est une valeur unique(scalaire) pour chaque champ.
Par exemple: je vais charger toutes les lignes de CHAMP1 dans un EDR, et à la fin, je vais avoir 3 valeurs pour le CHAMP 1(MIN, MAX, écart-type). J'ai concaténé ces trois valeurs dans la chaîne temporaire. En fin de compte, je vais avoir 15 à vingt lignes, contenant 4 colonnes dans ce format suivant
FIELD_NAME_1 MIN MAX SD
FIELD_NAME_2 MIN MAX SD
C'est un extrait du code:
//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))
val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev
Donc, j'ai 3 variables, min_value, max_value et SD que je veux stocker retour à la sf.
La Question 1:
Depuis la sortie sera plutôt petite, dois-je l'enregistrer localement sur le serveur? ou devrais-je faire un dump de HDFS. Me semble que le dumping localement le fichier fait plus de sens.
La Question 2:
Dans spark, je peux juste appeler pour enregistrer un RDD dans un fichier texte
some_RDD.saveAsTextFile("hdfs://namenode/path")
Comment puis-je faire la même chose dans une variable de Chaîne qui n'est pas un RDD dans scala? dois-je paralléliser mon résultat dans un CA d'abord et ensuite appeler saveAsTextFile?
OriginalL'auteur user2773013 | 2014-06-30
Vous devez vous connecter pour publier un commentaire.
Enregistrer localement viens de faire
some_RDD.collect()
Puis d'enregistrer le résultat dans un tableau avec quelque chose comme de ce question. Et oui, si le jeu de données est petit, et peut s'adapter facilement dans la mémoire, vous devez collecter et de l'amener au pilote du programme. Une autre option si les données sont un peu grande pour stocker dans la mémoire est juste
some_RDD.coalesce(numParitionsToStoreOn)
. Gardez à l'espritcoalesce
prend également une valeur booléenneshuffle
, si vous faites des calculs sur les données avant de coalescence, vous devez mettre cette valeur à true pour obtenir plus de parallélisme sur les calculs. Fusionner permettra de réduire le nombre de nœuds qui stockent les données lorsque vous appelezsome_RDD.saveAsTextFile("hdfs://namenode/path")
. Si le fichier est tout petit, mais vous avez besoin sur hdfs, appelrepartition(1)
, qui est le même quecoalesce(1,true)
, cela permettra d'assurer que vos données ne sont enregistrées que sur un nœud.Mise à JOUR:
Donc, si tout ce que vous voulez faire est de sauver trois valeurs dans HDFS vous pouvez le faire.
sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")
Fondamentalement, vous êtes juste de mettre les 3 vars dans un tuple, étirable dans une Liste et de régler le parallélisme de l'un, puisque les données sont très petites
êtes-vous sûr de ne pas dire qu'une seule valeur, le type d'unité en scala moyens de nullité, même si vous avez seulement une valeur, vous pouvez toujours utiliser recueillir
si votre résultat final n'est pas un EDR, comment avez-vous arriver à ce point, votre question est assez détaillées de l'onu. J'ai mis à jour pour montrer combien vous pourriez économiser de hdfs, mais seulement sur un nœud
merci une tonne aaronman. Désolé d'être floues. J'ai mis à jour la question. Hoepfully c'est un peu plus clair.
consultez la section mise à jour, je pense que c'est exactement ce que vous voulez
OriginalL'auteur aaronman
Réponse 1: vous avez juste besoin de plusieurs scalaires, je tiens à dire à leur stockage dans un système de fichiers local. Vous pouvez tout d'abord faire
val localValue = rdd.collect()
, qui permettra de recueillir toutes les données à partir de travailleurs à maîtriser. Et puis vous appelez java.io à écrire des choses sur le disque.Réponse 2: Vous pouvez faire de sc.paralléliser(yourString).saveAsTextFile("hdfs://hôte/monfichier"). La volonté d'écrire les choses de la partie-000*. Si vous voulez avoir tous les éléments dans un seul fichier,
hdfs dfs -getmerge
est là pour vous aider.OriginalL'auteur Chong Tang