How to directly send the output of a card reducer to another converter reducer without saving output to hdfs

Solution to the problem end up checking my solution at the bottom


Recently, I am trying to run an example recommendation in chaper6 (list 6.1 ~ 6.4) from Machu in action. But I ran into a problem, and I have googled, but I can not find a solution.

Here is the problem: I have a couple of card reducers

public final class WikipediaToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> { private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Matcher m = NUMBERS.matcher(line); m.find(); VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); VarLongWritable itemID = new VarLongWritable(); while (m.find()) { itemID.set(Long.parseLong(m.group())); context.write(userID, itemID); } } } public class WikipediaToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> { public void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException { Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for (VarLongWritable itemPref : itemPrefs) { userVector.set((int) itemPref.get(), 1.0f); } context.write(userID, new VectorWritable(userVector)); } } 

The reducer displays the user ID and userVector, and it looks like this: 98955 {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0}

Then I want to use another pair of gears to process this data.

 public class UserVectorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> { public void map(VarLongWritable key, VectorWritable value, Context context) throws IOException, InterruptedException { long userID = key.get(); Vector userVector = value.get(); Iterator<Vector.Element> it = userVector.iterateNonZero(); IntWritable itemIndexWritable = new IntWritable(); while (it.hasNext()) { Vector.Element e = it.next(); int itemIndex = e.index(); float preferenceValue = (float) e.get(); itemIndexWritable.set(itemIndex); context.write(itemIndexWritable, new VectorOrPrefWritable(userID, preferenceValue)); } } } 

When I try to start the task, it makes an error:

org.apache.hadoop.io.Text cannot be passed to org.apache.mahout.math.VectorWritable

the first gear unit writes the output to hdfs, and the second reverse marker tries to read the result, which can display 98955 in VarLongWritable, but cannot convert {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0} in VectorWritable. Therefore, I wonder if there is a way for the first reverse manipulator to directly send the result to the second pair, then there is no need to do data conversion. I searched for Hadoop in action, and hadoop: the final guide, it seems that there is no way to do this, any suggestions?


Problem resolved

Decision. Using SequenceFileOutputFormat , we can output and save the result of reducing the first MapReduce workflow to DFS, then the second MapReduce workflow can read the temporary file as an input file using SequenceFileInputFormat as a parameter when creating the resolver. Since the vector will be saved in a binary sequence file that has a specific format, SequenceFileInputFormat can read it and convert it back to vector format.

Here is a sample code:

 confFactory ToItemPrefsWorkFlow = new confFactory (new Path("/dbout"), //input file path new Path("/mahout/output.txt"), //output file path TextInputFormat.class, //input format VarLongWritable.class, //mapper key format Item_Score_Writable.class, //mapper value format VarLongWritable.class, //reducer key format VectorWritable.class, //reducer value format **SequenceFileOutputFormat.class** //The reducer output format ); ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class); ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class); JobConf conf1 = ToItemPrefsWorkFlow.getConf(); confFactory UserVectorToCooccurrenceWorkFlow = new confFactory (new Path("/mahout/output.txt"), new Path("/mahout/UserVectorToCooccurrence"), SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class //UserVectorToCooccurrenceMapper.class, IntWritable.class, IntWritable.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class ); UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class); UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class); JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf(); JobClient.runJob(conf1); JobClient.runJob(conf2); 

If you have any problems with this, feel free to contact me

+6
source share
3 answers

You need to explicitly configure the output of the first job to use SequenceFileOutputFormat and define the output classes and values:

 job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(VarLongWritable.class); job.setOutputKeyClass(VectorWritable.class); 

Without seeing your driver code, I assume that you use TextOutputFormat as the output, the first job and TextInputFormat as the input to the second - and this input format sends the <Text, Text> pairs to the second display block

+3
source

I am new to hauope, this is just my hunch about the answer, so please refrain from it / indicate whether this seems naive.

I think it is not wise to send from the gearbox to the next cartographer without saving to HDFS. Because "what data split goes to which cartographer" is elegantly designed to meet the locality criteria (goes to the mapper node, which have data stored locally).

If you do not store it on HDFS, most likely all data will be transmitted over a network that will be slow and may cause bandwidth problems.

+1
source

You must temporarily save the output of the first map reduction so that the second can use it.

This can help you understand how the output of the first map conversion is transferred to the second. (this is based on Generator.java Apache nutch ).

This is a temporary directory to display the first map abbreviation:

 Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/job1-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); 

Setting up the first work to reduce the map:

 JobConf job1 = getConf(); job1.setJobName("job 1"); FileInputFormat.addInputPath(...); sortJob.setMapperClass(...); FileOutputFormat.setOutputPath(job1, tempDir); job1.setOutputFormat(SequenceFileOutputFormat.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(...); JobClient.runJob(job1); 

Note that the output directory is set in the job configuration. Use this in the second quest:

 JobConf job2 = getConf(); FileInputFormat.addInputPath(job2, tempDir); job2.setReducerClass(...); JobClient.runJob(job2); 

Remember to clear the temp dirs after completion:

 // clean up FileSystem fs = FileSystem.get(getConf()); fs.delete(tempDir, true); 

Hope this helps.

0
source

Source: https://habr.com/ru/post/913831/


All Articles