Ce qui est une tâche Spark? Comment l'Étincelle travailleur d'exécuter le fichier jar?
Après la lecture d'un document sur http://spark.apache.org/docs/0.8.0/cluster-overview.html, j'ai quelques question que je tiens à préciser.
Prendre cet exemple de l'Étincelle:
JavaSparkContext spark = new JavaSparkContext(
new SparkConf().setJars("...").setSparkHome....);
JavaRDD<String> file = spark.textFile("hdfs://...");
//step1
JavaRDD<String> words =
file.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
//step2
JavaPairRDD<String, Integer> pairs =
words.map(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
//step3
JavaPairRDD<String, Integer> counts =
pairs.reduceByKey(new Function2<Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
counts.saveAsTextFile("hdfs://...");
Donc, disons que j'ai 3 nœuds de cluster, et le nœud 1 exécute en tant que maître, et au-dessus de programme pilote a été correctement jared (dire application-test.jar). Alors maintenant, je suis en cours d'exécution de ce code sur le nœud maître et je crois que juste après la SparkContext
en cours de création, l'application-test.jar le fichier sera copié dans les nœuds de travail (et de chaque travailleur va créer un dir pour cette application).
Alors maintenant ma question:
Sont etape 1, etape 2 et etape 3 dans l'exemple des tâches qui sont envoyés sur les travailleurs? Si oui, alors comment le travailleur doit-il exécuter? Comme java -cp "application-test.jar" step1
et ainsi de suite?
Vous devez vous connecter pour publier un commentaire.
Lorsque vous créez le
SparkContext
, chaque travailleur commence un exécuteur. C'est un processus distinct (JVM), et elle charge votre bocal de trop. Les exécuteurs testamentaires se connecter de nouveau à votre programme de pilote. Maintenant, le conducteur peut envoyer les commandes, commeflatMap
,map
etreduceByKey
dans votre exemple. Lorsque le conducteur s'arrête, les exécuteurs testamentaires de l'arrêter.Rdd sont un peu comme les grands tableaux qui sont divisés en partitions, et chaque interprète peut tenir certaines de ces partitions.
Un tâche est une commande envoyée par le conducteur d'un exécuteur testamentaire par la sérialisation de votre
Function
objet. L'exécuteur désérialise la commande (ce qui est possible parce qu'il a chargé votre jar), et l'exécute sur une partition.(C'est une vue d'ensemble conceptuelle. Je suis abstraction de certains détails, mais j'espère que c'est utile.)
Pour répondre à votre question: Non, un nouveau processus n'est pas lancé pour chaque étape. Un nouveau processus est démarré sur chaque travailleur lors de la
SparkContext
est construit.rdd.map(myFunc)
, une tâche est créée pour chaque partition. Les exécuteurs testamentaires de ramassage et d'exécuter les tâches. Chaque tâche dans ce cas vamyFunc
sur une partition. Certaines opérations, commereduceByKey
sont plus complexes, mais c'est l'idée de base.rdd.map
ne fait rien jusqu'à ce qu'il en a besoin. Si vous nerdd.filter(...).map(...).collect()
, lefilter
etmap
fonctions de s'exécuter uniquement sur les travailleurs lorsque vous appelezcollect
. Mais la plupart du temps vous n'avez pas besoin de penser à ce sujet.SparkContext.addJar
sera copié sur tous les nœuds du travailleur.sc.textFile
, le pilote de l'application sera d'abord obtenir la liste de "divisions" (des morceaux du fichier) à partir du système de fichiers. Puis l'exécuteur fils de chaque charge d'un split à un moment en parallèle.load
méthode peut charger à partir d'un répertoire, donc le fait que chaque partition est un fichier distinct est la plupart du temps cachés.Pour obtenir un clair aperçu de la façon dont les tâches sont créés et programmés, nous devons comprendre comment le modèle d'exécution de travaux dans Spark. Peu de temps de parler, une application spark est réalisée en trois étapes :
De votre mot-comte exemple, le RDD graphique est plutôt simple, c'est quelque chose comme suit :
fichier -> lignes -> mots -> par le nombre de mots -> global nombre de mots -> sortie
Basé sur ce graphique, deux étapes sont créés. L'étape de création de la règle est fondée sur l'idée de pipeline nombreuses et étroites transformations que possible. Dans votre exemple, l'étroitesse de la transformation se termine à chaque mot compte. Par conséquent, vous obtenez deux étapes
Une fois que les étapes sont compris, étincelle va générer des tâches à partir d'étapes. La première étape sera de créer ShuffleMapTasks et la dernière étape sera de créer ResultTasks parce que dans la dernière étape, une action de l'opération est inclus afin de produire des résultats.
Le nombre de tâches à être généré dépend de la façon dont vos fichiers sont distribués. Supposons que vous ayez 3 trois fichiers différents dans trois différents nœuds, la première étape sera de générer des 3 tâches : une tâche par partition.
Par conséquent, vous ne devriez pas la carte à vos mesures pour les tâches directement. Une tâche appartient à une étape, et est liée à une partition.
Généralement, le nombre de tâches a couru pour un stade est exactement le nombre de partitions de la finale de la RDD, mais depuis Rdd peuvent être partagées (et donc
ShuffleMapStages
) leur nombre varie en fonction de la RDD/stade de partage. Veuillez vous référer à Comment DAG fonctionne sous les couvertures dans les RDD?