Solution to the problem end up checking my solution at the bottom
Recently, I am trying to run an example recommendation in chaper6 (list 6.1 ~ 6.4) from Machu in action. But I ran into a problem, and I have googled, but I can not find a solution.
Here is the problem: I have a couple of card reducers
public final class WikipediaToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> { private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Matcher m = NUMBERS.matcher(line); m.find(); VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); VarLongWritable itemID = new VarLongWritable(); while (m.find()) { itemID.set(Long.parseLong(m.group())); context.write(userID, itemID); } } } public class WikipediaToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> { public void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException { Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for (VarLongWritable itemPref : itemPrefs) { userVector.set((int) itemPref.get(), 1.0f); } context.write(userID, new VectorWritable(userVector)); } }
The reducer displays the user ID and userVector, and it looks like this: 98955 {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0}
Then I want to use another pair of gears to process this data.
public class UserVectorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> { public void map(VarLongWritable key, VectorWritable value, Context context) throws IOException, InterruptedException { long userID = key.get(); Vector userVector = value.get(); Iterator<Vector.Element> it = userVector.iterateNonZero(); IntWritable itemIndexWritable = new IntWritable(); while (it.hasNext()) { Vector.Element e = it.next(); int itemIndex = e.index(); float preferenceValue = (float) e.get(); itemIndexWritable.set(itemIndex); context.write(itemIndexWritable, new VectorOrPrefWritable(userID, preferenceValue)); } } }
When I try to start the task, it makes an error:
org.apache.hadoop.io.Text cannot be passed to org.apache.mahout.math.VectorWritable
the first gear unit writes the output to hdfs, and the second reverse marker tries to read the result, which can display 98955 in VarLongWritable, but cannot convert {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0} in VectorWritable. Therefore, I wonder if there is a way for the first reverse manipulator to directly send the result to the second pair, then there is no need to do data conversion. I searched for Hadoop in action, and hadoop: the final guide, it seems that there is no way to do this, any suggestions?
Problem resolved
Decision. Using SequenceFileOutputFormat , we can output and save the result of reducing the first MapReduce workflow to DFS, then the second MapReduce workflow can read the temporary file as an input file using SequenceFileInputFormat as a parameter when creating the resolver. Since the vector will be saved in a binary sequence file that has a specific format, SequenceFileInputFormat can read it and convert it back to vector format.
Here is a sample code:
confFactory ToItemPrefsWorkFlow = new confFactory (new Path("/dbout"), //input file path new Path("/mahout/output.txt"), //output file path TextInputFormat.class, //input format VarLongWritable.class, //mapper key format Item_Score_Writable.class, //mapper value format VarLongWritable.class, //reducer key format VectorWritable.class, //reducer value format **SequenceFileOutputFormat.class** //The reducer output format ); ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class); ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class); JobConf conf1 = ToItemPrefsWorkFlow.getConf(); confFactory UserVectorToCooccurrenceWorkFlow = new confFactory (new Path("/mahout/output.txt"), new Path("/mahout/UserVectorToCooccurrence"), SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class //UserVectorToCooccurrenceMapper.class, IntWritable.class, IntWritable.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class ); UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class); UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class); JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf(); JobClient.runJob(conf1); JobClient.runJob(conf2);
If you have any problems with this, feel free to contact me