MapReduce Example

I read about mapreduce, and I was interested to know about a specific scenario. Let's say we have several files (fileA, fileB, fileC, for example), each of which consists of several integers. If we wanted to sort numbers from all files to create something like this:

23 fileA 34 fileB 35 fileA 60 fileA 60 fileC 

How will the map and reduction process work?

This is currently what I have, but it is not entirely correct:

  • (fileName, fileContent) -> (map to) (Number, fileName)

  • sort temporary keys, pairs of values โ€‹โ€‹and get (Number, (list of){fileName1, fileName2...})

  • Reduce time pairs and get

     (Number, fileName1) (Number, fileName2) 

    etc.

The problem is that the file names may not be in alphabetical order at the sorting stage, and therefore part of the reduction will not generate the correct output. Can someone give an idea of โ€‹โ€‹the right approach to this scenario?

+5
source share
1 answer

The best way to achieve this is through secondary sorting. You need to sort both keys (in the numbers of your number) and values โ€‹โ€‹(in the file names of your case). In Hadoop, the cartographer's output is sorted by key only.

This can be achieved with a combined key: a key that is a combination of both numbers and file names. E.g. for the first record, the key will be (23, fileA), and not just (23).

You can read about secondary sorting here: https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch01.html

You can also skip to the " Secondary Sorting " section in the Hadoop The Definitive Guide .

For simplicity, I wrote a program to achieve the same.

In this program, keys are sorted by default by cartographers. I wrote the logic to sort the values โ€‹โ€‹on the gearbox side. Therefore, he takes care of sorting both keys and values โ€‹โ€‹and produces the desired result.

The following is the program:

 package com.myorg.hadooptests; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.*; public class SortedValue { public static class SortedValueMapper extends Mapper<LongWritable, Text , Text, IntWritable>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(" "); if(tokens.length == 2) { context.write(new Text(tokens[1]), new IntWritable(Integer.parseInt(tokens[0]))); } } } public static class SortedValueReducer extends Reducer<Text, IntWritable, IntWritable, Text> { Map<String, ArrayList<Integer>> valueMap = new HashMap<String, ArrayList<Integer>>(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { String keyStr = key.toString(); ArrayList<Integer> storedValues = valueMap.get(keyStr); for (IntWritable value : values) { if (storedValues == null) { storedValues = new ArrayList<Integer>(); valueMap.put(keyStr, storedValues); } storedValues.add(value.get()); } Collections.sort(storedValues); for (Integer val : storedValues) { context.write(new IntWritable(val), key); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "CompositeKeyExample"); job.setJarByClass(SortedValue.class); job.setMapperClass(SortedValueMapper.class); job.setReducerClass(SortedValueReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("/in/in1.txt")); FileOutputFormat.setOutputPath(job, new Path("/out/")); System.exit(job.waitForCompletion(true) ? 0:1); } } 

Card Logic:

  • Parses each line. It is assumed that the key and value are separated by a null character ("").
  • If the string contains 2 tokens, it emits (file name, integer value). E.g. for the first record, it emits (fileA, 23).

Reduction logic:

  • It puts the pairs (key, value) in a HashMap, where key is the name of the file and value is a list of integers for this file. E.g. for file A, the saved values โ€‹โ€‹will be 23, 34 and 35

  • Finally, it sorts the values โ€‹โ€‹for a particular key and for each value emits (value, key) from the reducer. E.g. for fileA, the output is: (23, fileA), (34, fileA) and (35, fileA)

I ran this program for the following input:

 34 fileB 35 fileA 60 fileC 60 fileA 23 fileA 

I got the following output:

 23 fileA 35 fileA 60 fileA 34 fileB 60 fileC 
+3
source

Source: https://habr.com/ru/post/1238054/


All Articles