There may be cases where we need to partition our data based on certion condition.
Say for example, Consider this Employee data
EmpId,EmpName,Age,Gender,Salary
1201,gopal,45,Male,50000
1202,manisha,40,Female,51000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1213,lavanya,18,Female,8000
Lets assume one condition.
We need to seperate above data based on Gender (there can be more scenarios)
Expected outcome will be like this
Female
1213,lavanya,18,Female,8000
1202,manisha,40,Female,51000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
Male
1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1201,gopal,45,Male,50000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
This can be achieved by using MultipleOutputs in Hadoop.
The name itself gives an idea on what MultipleOutputs is - writes output data to multiple outputs
Lets see how to implement this
Driver Class
public class PartitionerDriver extends Configured implements Tool {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
try {
int res = ToolRunner.run(conf, new PartitionerDriver(), args);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public int run(String[] args) {
// TODO Auto-generated method stub
System.out.println("Partitioning File Based on Gender...........");
if (args.length != 3) {
System.err
.println("Usage:File Partitioner <input> <output> <delimiter> ");
System.exit(0);
}
Configuration conf = new Configuration();
/*
* Arguments
*/
String source = args[0];
String dest = args[1];
String delimiter = args[2];
//conf objects
conf.set("delimiter", delimiter);
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Path in = new Path(source);
Path out = new Path(dest);
Job job0 = null;
try {
job0 = new Job(conf, "Partition Records");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
job0.setJarByClass(PartitionerDriver.class);
job0.setMapperClass(PartitionMapper.class);
job0.setReducerClass(PartitionReducer.class);
job0.setMapOutputKeyClass(Text.class);
job0.setMapOutputValueClass(Text.class);
job0.setOutputKeyClass(Text.class);
job0.setOutputValueClass(Text.class);
try {
TextInputFormat.addInputPath(job0, in);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/*
* Delete output dir if exist
*/
try {
if (fs.exists(out)) {
fs.delete(out, true);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
TextOutputFormat.setOutputPath(job0, out);
try {
job0.waitForCompletion(true);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Successfully partitioned Data based on Gender!");
return 0;
}
}
Mapper Class
Mapper class gets each record and split them using delimiter.
Key will be Gender and value will be the record
protected void map(LongWritable key, Text value, Context context) {
Configuration conf = context.getConfiguration();
String delim = conf.get("delimiter");
String line = value.toString();
String[] record = line.split(delim);
keyEmit.set(record[3]);
try {
context.write(keyEmit, value);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Reducer Class
MultipleOutputs<NullWritable, Text> mos;
NullWritable out = NullWritable.get();
@Override
protected void setup(Context context) {
mos = new MultipleOutputs(context);
}
public void reduce(Text key, Iterable<Text> values, Context context) {
for (Text value : values) {
try {
mos.write(out, value, key.toString());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
protected void cleanup(Context context) {
try {
mos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Here we define MultipleOutput as
MultipleOutputs<NullWritable, Text> mos;
Our file doesnot need a key.We are only interested to get the data. So key will be NullWritable and Value will be Text.
We have a setup() method where we initialize out MultipleOutput.
mos = new MultipleOutputs(context);
Lets see reduce()
As we know reducer aggregates values based on key. Key from mapper was Gender and we have 2 genders Male and female . so we will recieve 2 keys follwed by its value.
for (Text value : values) {
mos.write(out, value, key.toString());
}
In MultipleOutputs we have 3 arguments
1. key
2. value and
3. File Name.
Here our key is NullWritable ,Value will be the record for each key. And we named our file with key.
Once you run this code. You can see an output as below
Two files
1. Female-r-00000
2. Male-r-00000
Lets see the contents
You can find the code here .
Happy Hadooping..............