Aggregation functions are sum,min,max,count etc.These aggregations are really useful in statictics and can be done in Hadoop MapReduce also.If aggregation functions are to be done on a large data we can do it in MapReduce also.
Below is the code for finding Min() and Max() for each columns of a csv file in MapReduce.
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @author Unmesha SreeVeni U.B
*/
public class ColumnAggregator {
public static class ColMapper extends
Mapper<Object, Text, Text, DoubleWritable> {
/*
* Emits column Id as key and entire column elements as Values
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] cols = value.toString().split(",");
for (int i = 0; i < cols.length; i++) {
context.write(new Text(String.valueOf(i + 1)),new DoubleWritable(Double.parseDouble(cols[i])));
}
}
}
public static class ColReducer extends
Reducer<Text, DoubleWritable, Text, DoubleWritable> {
/*
* Reducer finds min and max of each column
*/
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
double min = Integer.MAX_VALUE, max = 0;
Iterator<DoubleWritable> iterator = values.iterator(); //Iterating
while (iterator.hasNext()) {
double value = iterator.next().get();
if (value < min) { //Finding min value
min = value;
}
if (value > max) { //Finding max value
max = value;
}
}
context.write(new Text(key), new DoubleWritable(min));
context.write(new Text(key), new DoubleWritable(max));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Min and Max");
job.setJarByClass(ColumnAggregator.class);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(ColMapper.class);
job.setReducerClass(ColReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
For any MapReduce program there are 3 classes
1.Driver Class for Configuration
2.Mapper
3.Reducer
Mapper:
Map receives Offset of the file and each line as key value pair.Map generates an id for each column and emit the id and entire column to Reducer.
Reducer:
Reducer recieves each column Id and List of values as key value pair and finds min and max for each key and emit column id as key and min and max as values
Here ,If only 1 reducer is used ,then we will be stressing the Reducer for finding min and max.There is a better idea that can be done in Map()
We have setup() and cleanup() functions.
setup() executes before all map() and
cleanup() executes after all map().
It is better to add min and max finding code in cleanup()
Map()
{
/*No emit*/
}
cleanup()
{
emit(colId(,min,max))
}
Again in reducer we need to find Min and Max
reducer()
{
emit(colId,(min,max))
}
Now the Reducer need to calculate only some combinations of min amd max.This way we can reduce the stress given to reducer.
Happy Hadooping.