Mapreduce Combiner

J'ai un simple code mapreduce avec mappeur, réducteur et combiner.
La sortie de mapper est passé à combiner. Mais pour le réducteur, au lieu de la sortie de combiner,de sortie de mapper est passé.

Aide gentiment

Code:

package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class AverageSalary
{
public static class Map extends  Mapper<LongWritable, Text, Text, DoubleWritable> 
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
{    
String[] empDetails= value.toString().split(",");
Text unit_key = new Text(empDetails[1]);      
DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
context.write(unit_key,salary_value);    
}  
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text> 
{
public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
{
String val;
double sum=0;
int len=0;
while (values.iterator().hasNext())
{
sum+=values.iterator().next().get();
len++;
}
val=String.valueOf(sum)+":"+String.valueOf(len);
try {
context.write(key,new Text(val));
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text,Text, Text,Text> 
{
public void reduce (final Text key, final Text values, final Context context)
{
//String[] sumDetails=values.toString().split(":");
//double average;
//average=Double.parseDouble(sumDetails[0]);
try {
context.write(key,values);
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String args[])
{
Configuration conf = new Configuration();
try
{
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    
if (otherArgs.length != 2) {      
System.err.println("Usage: Main <in> <out>");      
System.exit(-1);    }    
Job job = new Job(conf, "Average salary");    
//job.setInputFormatClass(KeyValueTextInputFormat.class);    
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    
job.setJarByClass(AverageSalary.class);    
job.setMapperClass(Map.class);    
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);    
job.setOutputKeyClass(Text.class);    
job.setOutputValueClass(Text.class);    
System.exit(job.waitForCompletion(true) ? 0 : -1);
} catch (ClassNotFoundException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
//TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}

}

Il est vraiment difficile de deviner ce qui se passe sans un peu de code.
le code a été ajouté

OriginalL'auteur user2401464 | 2013-11-26