Hadoop ChainMapper, ChainReducer

Je suis relativement nouveau sur Hadoop et à essayer de comprendre comment par programmation de la chaîne de l'emploi (plusieurs cartographes, réducteurs) avec ChainMapper, ChainReducer. J'ai trouvé un peu partielle des exemples, mais pas un seul compléter et de travail.

Mon code de test est

public class ChainJobs extends Configured implements Tool {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }
    }
}

public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken().concat("Justatest"));
            output.collect(word, one);
        }
    }
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

@Override
public int run(String[] args)  {

    Configuration conf = getConf();
    JobConf job = new JobConf(conf);

    job.setJobName("TestforChainJobs");
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    JobConf map1Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf);

    JobConf map2Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf);

    JobConf reduceConf = new JobConf(false);
    ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

    JobClient.runJob(job);
    return 0;

     }

}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainJobs(), args);
    System.exit(res);
}

Mais il échoue avec

MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
    at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389)
    at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
    at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268)
    at java\.security\.AccessController\.doPrivileged(Native Method)
    at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)

Tous conseils ou un très simple exemple de travail très apprécié.

  • Vous havee des problèmes avec le pilote de code qui je peux un peu comprendre. Configuration de l'objet n'a pas été déclaré corectly. Veuillez consulter le code fourni par moi dans la réponse ci-dessous et spécialement le code de pilote pour savoir ce qui n'allait pas. Merci 🙂
InformationsquelleAutor moodywoody | 2012-10-17