I have a simple application for applications that receives one CSV file, then splits the entry into "," and then counts the first elements.
Below is the code.
package com.bluedolphin;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJob extends Configured implements Tool {
private final static LongWritable one = new LongWritable (1);
public static class MapClass extends Mapper <Object, Text, Text, LongWritable> {
private Text word = new Text ();
public void map (Object key,
Text value
OutputCollector <Text, LongWritable> output,
Reporter reporter) throws IOException, InterruptedException {
String [] citation = value.toString (). Split (",");
word.set (citation [0]);
output.collect (word, one);
}
}
public static class Reduce extends Reducer <Text, LongWritable, Text, LongWritable> {
public void reduce (
Text key
Iterator <LongWritable> values,
OutputCollector <Text, LongWritable> output,
Reporter reporter) throws IOException, InterruptedException {
int sum = 0;
while (values.hasNext ()) {
sum + = values.next (). get ();
}
output.collect (key, new LongWritable (sum));
}
}
public static class Combiner extends Reducer <Text, IntWritable, Text, LongWritable> {
public void reduce (
Text key
Iterator <LongWritable> values,
OutputCollector <Text, LongWritable> output,
Reporter reporter) throws IOException, InterruptedException {
int sum = 0;
while (values.hasNext ()) {
sum + = values.next (). get ();
}
output.collect (key, new LongWritable (sum));
}
}
public int run (String [] args) throws Exception {
Configuration conf = getConf ();
Job job = new Job (conf, "MyJob");
job.setJarByClass (MyJob.class);
Path in = new Path (args [0]);
Path out = new Path (args [1]);
FileInputFormat.setInputPaths (job, in);
FileOutputFormat.setOutputPath (job, out);
job.setMapperClass (MapClass.class);
// job.setCombinerClass (Combiner.class);
job.setReducerClass (Reduce.class);
// job.setInputFormatClass (KeyValueInputFormat.class);
job.setInputFormatClass (TextInputFormat.class);
// job.setOutputFormatClass (KeyValueOutputFormat.class);
job.setOutputKeyClass (Text.class);
job.setOutputValueClass (LongWritable.class);
System.exit (job.waitForCompletion (true)? 0: 1);
return 0;
}
public static void main (String args []) throws Exception {
int res = ToolRunner.run (new Configuration (), new MyJob (), args);
System.exit (res);
}
}
This is mistake:
11/12/16 22:16:58 INFO mapred.JobClient: Task Id: attempt_201112161948_0005_m_000000_0, Status: FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask $ MapOutputBuffer.collect (MapTask.java:1013)
at org.apache.hadoop.mapred.MapTask $ NewOutputCollector.write (MapTask.java:690)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write (TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map (Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run (Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper (MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run (MapTask.javahaps69)
at org.apache.hadoop.mapred.Child $ 4.run (Child.java:259)
at java.security.AccessController.doPrivileged (Native Method)
at javax.security.auth.Subject.doAs (Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main (Child.java:253)
source share