Tuesday, 22 April 2014

Aggregations In Hadoop MapReduce


Aggregation functions are sum,min,max,count etc.These aggregations are really useful in statictics and can be done in Hadoop MapReduce also.If aggregation functions are to be done on a large data we can do it in MapReduce also.
Below is the code for finding Min() and Max() for each columns of a csv file in MapReduce.

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 */
public class ColumnAggregator {

 public static class ColMapper extends
   Mapper<Object, Text, Text, DoubleWritable> {
  /*
   * Emits column Id as key and entire column elements as Values
   */
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String[] cols = value.toString().split(",");
   for (int i = 0; i < cols.length; i++) { 
    context.write(new Text(String.valueOf(i + 1)),new DoubleWritable(Double.parseDouble(cols[i])));
   }

  }
 }

 public static class ColReducer extends
   Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  /*
   * Reducer finds min and max of each column
   */

  public void reduce(Text key, Iterable<DoubleWritable> values,
    Context context) throws IOException, InterruptedException {
   double min = Integer.MAX_VALUE, max = 0;
   Iterator<DoubleWritable> iterator = values.iterator(); //Iterating 
   while (iterator.hasNext()) {
    double value = iterator.next().get();
    if (value < min) { //Finding min value
     min = value;
    }
    if (value > max) { //Finding max value
     max = value;
    }
   }
   context.write(new Text(key), new DoubleWritable(min));
   context.write(new Text(key), new DoubleWritable(max));
  }
 }
 public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  Job job = new Job(conf, "Min and Max");
  job.setJarByClass(ColumnAggregator.class);
  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(new Path(args[1]))) {
   fs.delete(new Path(args[1]), true);
  }
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(DoubleWritable.class);

  job.setMapperClass(ColMapper.class);
  job.setReducerClass(ColReducer.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Explanation


For any MapReduce program there are 3 classes

1.Driver Class for Configuration

2.Mapper

3.Reducer


Mapper:


Map receives Offset of the file and each line as key value pair.Map generates an id for each column and emit the id and entire column to Reducer.


Reducer:


Reducer recieves each column Id and List of values as key value pair and finds min and max for each key and emit column id as key and min and max as values

Here ,If only 1 reducer is used ,then we will be stressing the Reducer for finding min and max.There is a better idea that can be done in Map()
We have setup() and cleanup() functions.

 

setup() executes before all map() and 

cleanup() executes after all map().



It is better to add min and max finding code in cleanup()

Map()
{
         /*No emit*/  
}
cleanup()
{
         emit(colId(,min,max))
}

Again in reducer we need to find Min and Max
reducer()
{
        emit(colId,(min,max))
}

Now the Reducer need to calculate only some combinations of min amd max.This way we can reduce the stress given to reducer.

Happy Hadooping.

7 comments:

  1. I Think the question should be modified.
    It should be solving a single word position presence in single line with Minimum Position[index] and Maximum Position[Index]. Here, Index of the word in a single line presence].

    At last the MR Job output the a unique word presence in every line comparison and emits the Index in Total.

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. Great Article… I love to read your articles because your writing style is too good, its is very very helpful for all of us and I never get bored while reading your article because, they are becomes a more and more interesting from the starting lines until the end. 
    microsoft azure training in bangalore
    rpa training in bangalore
    best rpa training in bangalore
    rpa online training

    ReplyDelete
  4. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
    Best Devops online Training
    Online DevOps Certification Course - Gangboard

    ReplyDelete
  5. A bewildering web journal I visit this blog, it's unfathomably heavenly. Oddly, in this present blog's substance made the purpose of actuality and reasonable. The substance of data is informative
    Oracle Fusion Financials Online Training
    Oracle Fusion HCM Online Training
    Oracle Fusion SCM Online Training

    ReplyDelete
  6. Too Good article,Thank you for sharing this awesome Blog.
    Keep Updating more....

    Big Data Online Course

    ReplyDelete