Hadoop ChainMapper, ChainReducer
Je suis relativement nouveau sur Hadoop et à essayer de comprendre comment par programmation de la chaîne de l'emploi (plusieurs cartographes, réducteurs) avec ChainMapper, ChainReducer. J'ai trouvé un peu partielle des exemples, mais pas un seul compléter et de travail.
Mon code de test est
public class ChainJobs extends Configured implements Tool {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken().concat("Justatest"));
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
@Override
public int run(String[] args) {
Configuration conf = getConf();
JobConf job = new JobConf(conf);
job.setJobName("TestforChainJobs");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf);
JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf);
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
JobClient.runJob(job);
return 0;
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ChainJobs(), args);
System.exit(res);
}
Mais il échoue avec
MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object
at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)
at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389)
at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268)
at java\.security\.AccessController\.doPrivileged(Native Method)
at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)
Tous conseils ou un très simple exemple de travail très apprécié.
- Vous havee des problèmes avec le pilote de code qui je peux un peu comprendre. Configuration de l'objet n'a pas été déclaré corectly. Veuillez consulter le code fourni par moi dans la réponse ci-dessous et spécialement le code de pilote pour savoir ce qui n'allait pas. Merci 🙂
Vous devez vous connecter pour publier un commentaire.
J'ai codé un wordcount poste basé sur une chaîne de mapper. Le code a été écrit sur la nouvelle API et sa fonctionne bien 🙂
la partie de la production a été montré ci-dessous
Vous pourriez obtenir pour voir spéciales ou de caractères indésirables puisque je n'ai pas utilisé de nettoyage afin d'éliminer les signes de ponctuation. Je viens de ont mis l'accent sur le travail d'une chaîne mappeur.
Merci 🙂
Ce Stackoverflow question contient la réponse :
https://stackoverflow.com/a/10470437/1008310