Tuesday, 22 April 2014

Aggregations In Hadoop MapReduce


Aggregation functions are sum,min,max,count etc.These aggregations are really useful in statictics and can be done in Hadoop MapReduce also.If aggregation functions are to be done on a large data we can do it in MapReduce also.
Below is the code for finding Min() and Max() for each columns of a csv file in MapReduce.

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 */
public class ColumnAggregator {

 public static class ColMapper extends
   Mapper<Object, Text, Text, DoubleWritable> {
  /*
   * Emits column Id as key and entire column elements as Values
   */
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String[] cols = value.toString().split(",");
   for (int i = 0; i < cols.length; i++) { 
    context.write(new Text(String.valueOf(i + 1)),new DoubleWritable(Double.parseDouble(cols[i])));
   }

  }
 }

 public static class ColReducer extends
   Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  /*
   * Reducer finds min and max of each column
   */

  public void reduce(Text key, Iterable<DoubleWritable> values,
    Context context) throws IOException, InterruptedException {
   double min = Integer.MAX_VALUE, max = 0;
   Iterator<DoubleWritable> iterator = values.iterator(); //Iterating 
   while (iterator.hasNext()) {
    double value = iterator.next().get();
    if (value < min) { //Finding min value
     min = value;
    }
    if (value > max) { //Finding max value
     max = value;
    }
   }
   context.write(new Text(key), new DoubleWritable(min));
   context.write(new Text(key), new DoubleWritable(max));
  }
 }
 public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  Job job = new Job(conf, "Min and Max");
  job.setJarByClass(ColumnAggregator.class);
  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(new Path(args[1]))) {
   fs.delete(new Path(args[1]), true);
  }
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(DoubleWritable.class);

  job.setMapperClass(ColMapper.class);
  job.setReducerClass(ColReducer.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Explanation


For any MapReduce program there are 3 classes

1.Driver Class for Configuration

2.Mapper

3.Reducer


Mapper:


Map receives Offset of the file and each line as key value pair.Map generates an id for each column and emit the id and entire column to Reducer.


Reducer:


Reducer recieves each column Id and List of values as key value pair and finds min and max for each key and emit column id as key and min and max as values

Here ,If only 1 reducer is used ,then we will be stressing the Reducer for finding min and max.There is a better idea that can be done in Map()
We have setup() and cleanup() functions.

 

setup() executes before all map() and 

cleanup() executes after all map().



It is better to add min and max finding code in cleanup()

Map()
{
         /*No emit*/  
}
cleanup()
{
         emit(colId(,min,max))
}

Again in reducer we need to find Min and Max
reducer()
{
        emit(colId,(min,max))
}

Now the Reducer need to calculate only some combinations of min amd max.This way we can reduce the stress given to reducer.

Happy Hadooping.

2 comments:

  1. I Think the question should be modified.
    It should be solving a single word position presence in single line with Minimum Position[index] and Maximum Position[Index]. Here, Index of the word in a single line presence].

    At last the MR Job output the a unique word presence in every line comparison and emits the Index in Total.

    ReplyDelete