Multiple Hadoop Inputs

I use hasoop map to reduce, and I want to calculate two files. My first iteration of Map / Reduce gives me a file with a pair identification number like this:

A 30 D 20 

My goal is to use this identifier from a file to link to another file and have a different output from the trio: ID, Number, Name, for example:

 A ABC 30 D EFGH 20 

But I'm not sure if using Map Reduce is the best way to do this. Would it be better, for example, to use File Reader to read the second input file and get the name by ID? Or can I do this using the Zoom out card?

If so, I'm trying to figure out how to do this. I tried the MultipleInput solution:

 MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"), TextInputFormat.class, FlightsByCarrierMapper2.class); MultipleInputs.addInputPath(job2, new Path("inputplanes"), TextInputFormat.class, FlightsModeMapper.class); 

But I canโ€™t come up with any solution to combine the two and get the desired result. The way I am talking now just gives me a list similar to this example:

 A ABC A 30 B ABCD C ABCDEF D EFGH D 20 

After my last decrease, I get the following:

 N125DL 767-332 N125DL 7 , N126AT 737-76N N126AT 19 , N126DL 767-332 N126DL 1 , N127DL 767-332 N127DL 7 , N128DL 767-332 N128DL 3 

I want this: N127DL 7 767-332. And also, I do not want those that do not fit.

And this is my abbreviation class:

Public class FlightByCarrierReducer2 extends gearbox {

 String merge = ""; protected void reduce(Text token, Iterable<Text> values, Context context) throws IOException, InterruptedException { int i = 0; for(Text value:values) { if(i == 0){ merge = value.toString()+","; } else{ merge += value.toString(); } i++; } context.write(token, new Text(merge)); } 

}

Update:

http://stat-computing.org/dataexpo/2009/the-data.html , this is an example that I use.

I try: TailNum and Canceled, which (1 or 0) gets the model name corresponding to TailNum. My model file has TailNumb, Model and other things. My current output:

N193JB ERJ 190-100 IGW

N194DN 767-332

N19503 EMB-135ER

N19554 EMB-145LR

N195DN 767-332

N195DN 2

First comes the key, the second - the model, keys that have canceled flights, apperas below the model.

And I would like a trio of Key, Model Number of Canceled, because I want the amount of revocation per model

+5
source share
2 answers

You can join them using the identifier as the key for both mappings. You can write your mapping task something like this

 public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException { //Get the line //split the line to get ID seperate //word1 = A //word2 = 30 //Likewise for A ABC //word1 = A //word2 = ABC context.write(word1, word2); } 

I think you can accomplish the same map task. And then write a commomn reducer job where the Hadoop Framework groups data by keywords. Thus, you can get the identifier as a key. And you can cache one of the values โ€‹โ€‹and then concat.

 String merge = ""; public void reduce(Text key, Iterable<Text> values, Context context) { int i =0; for(Text value:values) { if(i == 0){ merge = value.toString()+","; } else{ merge += value.toString(); } i++; } valEmit.set(merge); context.write(key, valEmit); } 

Finally, you can write a driver class

 public int run(String[] args) throws Exception { Configuration c=new Configuration(); String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); Path p1=new Path(files[0]); Path p2=new Path(files[1]); Path p3=new Path(files[2]); FileSystem fs = FileSystem.get(c); if(fs.exists(p3)){ fs.delete(p3, true); } Job job = new Job(c,"Multiple Job"); job.setJarByClass(MultipleFiles.class); MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class); MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class); job.setReducerClass(MultipleReducer.class); . . } 

You can find an example HERE

Hope this helps.


UPDATE

Input1

 A 30 D 20 

Input2

 A ABC D EFGH 

Exit

 A ABC 30 D EFGH 20 

Mapper.java

 import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author sreeveni * */ public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { Text keyEmit = new Text(); Text valEmit = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String parts[] = line.split(" "); keyEmit.set(parts[0]); valEmit.set(parts[1]); context.write(keyEmit, valEmit); } } 

Reducer.java

 import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author sreeveni * */ public class ReducerJoin extends Reducer<Text, Text, Text, Text> { Text valEmit = new Text(); String merge = ""; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String character = ""; String number = ""; for (Text value : values) { // ordering output String val = value.toString(); char myChar = val.charAt(0); if (Character.isDigit(myChar)) { number = val; } else { character = val; } } merge = character + " " + number; valEmit.set(merge); context.write(key, valEmit); } } 

Driver class

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author sreeveni * */ public class Driver extends Configured implements Tool { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub // checking the arguments count if (args.length != 3) { System.err .println("Usage : <inputlocation> <inputlocation> <outputlocation> "); System.exit(0); } int res = ToolRunner.run(new Configuration(), new Driver(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub String source1 = args[0]; String source2 = args[1]; String dest = args[2]; Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", " "); // changing default // delimiter to user // input delimiter FileSystem fs = FileSystem.get(conf); Job job = new Job(conf, "Multiple Jobs"); job.setJarByClass(Driver.class); Path p1 = new Path(source1); Path p2 = new Path(source2); Path out = new Path(dest); MultipleInputs.addInputPath(job, p1, TextInputFormat.class, Mapper1.class); MultipleInputs.addInputPath(job, p2, TextInputFormat.class, Mapper1.class); job.setReducerClass(ReducerJoin.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); /* * delete if exist */ if (fs.exists(out)) fs.delete(out, true); TextOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } } 
+1
source

Your reducer has a map method, but it must have a reduction method that takes a set of Iterable values, which you then combine. Since you don't have a reduce () method, you get a default behavior that should just go through all the key / value pairs.

0
source

Source: https://habr.com/ru/post/1208547/


All Articles