Mapreduce Combiner
J'ai un simple code mapreduce avec mappeur, réducteur et combiner.
La sortie de mapper est passé à combiner. Mais pour le réducteur, au lieu de la sortie de combiner,de sortie de mapper est passé.
Aide gentiment
Code:
package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class AverageSalary
{
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] empDetails= value.toString().split(",");
Text unit_key = new Text(empDetails[1]);
DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
context.write(unit_key,salary_value);
}
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text>
{
public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
{
String val;
double sum=0;
int len=0;
while (values.iterator().hasNext())
{
sum+=values.iterator().next().get();
len++;
}
val=String.valueOf(sum)+":"+String.valueOf(len);
try {
context.write(key,new Text(val));
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text,Text, Text,Text>
{
public void reduce (final Text key, final Text values, final Context context)
{
//String[] sumDetails=values.toString().split(":");
//double average;
//average=Double.parseDouble(sumDetails[0]);
try {
context.write(key,values);
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String args[])
{
Configuration conf = new Configuration();
try
{
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Main <in> <out>");
System.exit(-1); }
Job job = new Job(conf, "Average salary");
//job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(AverageSalary.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : -1);
} catch (ClassNotFoundException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Il est vraiment difficile de deviner ce qui se passe sans un peu de code.
le code a été ajouté
le code a été ajouté
OriginalL'auteur user2401464 | 2013-11-26
Vous devez vous connecter pour publier un commentaire.
Il semble que vous avez oublié propriété importante d'un combineur:
Vous ne pouvez pas prendre dans un
Text/DoubleWritable
et retourner unText/Text
. Je vous suggère d'utiliserText
au LieuDoubleWritable
, et de faire une analyse à l'intérieur deCombiner
.OriginalL'auteur user987339
Le n ° 1 de la règle de Multiplexeurs sont: ne présumez pas que le viseur sera exécuté. Traiter les combiner seulement comme une optimisation.
Le Viseur n'est pas garanti sur l'ensemble de vos données. Dans certains cas, lorsque les données n'a pas besoin d'être renversé sur le disque, MapReduce va sauter en utilisant le Viseur entièrement. Notez également que le Viseur peut être exécuté plusieurs fois sur des sous-ensembles de données! Ça va fonctionner une fois par déversement.
Dans votre cas, vous êtes cette mauvaise hypothèse. Vous devriez être en train de faire la somme dans le Viseur ET le Réducteur.
Aussi, vous devez suivre @user987339 de réponse. L'entrée et la sortie du combineur doit être identique (Texte,Double -> Texte,Double) et qu'il doit correspondre avec la sortie de la Mappeur et l'entrée du Réducteur.
OriginalL'auteur Donald Miner
Si une moissonneuse-batteuse fonction est utilisée, il est de la même forme que la fonction de réduction (et est
une mise en œuvre de Réducteur), à l'exception de ses types de sorties sont l'intermédiaire de la clé et de
types de valeur (K2, V2), afin qu'ils puissent nourrir la fonction de réduction:
carte: (K1, V1) → list(K2, V2)
combiner: (K2, liste(V2)) → list(K2, V2)
réduire: (K2, liste(V2)) → list(K3, V3)
Souvent, les combiner et à réduire les fonctions sont les mêmes, dans ce cas, le K3 est le même que
K2, et V3 est le même que V2.
OriginalL'auteur Narsireddy
Combiner
ne fonctionnera pas toujours lorsque vous exécutezmapreduce
.Si il y a au moins trois déversement de fichiers (sortie de correspondance écrite locale-disque), le viseur sera exécuter de sorte que la taille de fichier peut être réduite de sorte qu'il peut être facilement transféré à réduire nœud.
Le nombre de déversements pour qui un multiplexeur besoin pour exécuter peut être définie par
min.num.spills.for.combine
propriétéOriginalL'auteur jintocvg