This may seem like a silly question, but I don't see a problem in my types in my mapreduce code for hadoop
As stated in the question, the problem is that it expects IntWritable, but I pass it a Text object in the collector.collect of the reducer.
My job configuration has the following mapper output classes:
conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(IntWritable.class);
And the following gear output classes:
conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);
My mapping class has the following definition:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable>
with the required function:
public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter)
And then it fails when I call:
output.collect(new Text(),new IntWritable());
I'm new to map conversion, but all types seem to match, it compiles, but then doesn't work on this line, saying that it expects IntWritable to become the key for the reduction class. If this is important, I am using Hadoop version 0.21
Here is my map class:
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> { private IntWritable node = new IntWritable(); private IntWritable edge = new IntWritable(); public void map(LongWritable key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { node.set(Integer.parseInt(tokenizer.nextToken())); edge.set(Integer.parseInt(tokenizer.nextToken())); if(node.get() < edge.get()) output.collect(node, edge); } } }
and my abbreviation class:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable> { IntWritable $ = new IntWritable(Integer.MAX_VALUE); Text keyText = new Text(); public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { ArrayList<IntWritable> valueList = new ArrayList<IntWritable>();
and my work configuration:
JobConf conf = new JobConf(Triangles.class); conf.setJobName("mapred1"); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path("mapred1")); JobClient.runJob(conf);