There may be cases where we need to partition our data based on certion condition.
Say for example, Consider this Employee data
EmpId,EmpName,Age,Gender,Salary 1201,gopal,45,Male,50000 1202,manisha,40,Female,51000 1203,khaleel,34,Male,30000 1204,prasanth,30,Male,31000 1205,kiran,20,Male,40000 1206,laxmi,25,Female,35000 1207,bhavya,20,Female,15000 1208,reshma,19,Female,14000 1209,kranthi,22,Male,22000 1210,Satish,24,Male,25000 1211,Krishna,25,Male,26000 1212,Arshad,28,Male,20000 1213,lavanya,18,Female,8000
Lets assume one condition.
We need to seperate above data based on Gender (there can be more scenarios)
Expected outcome will be like this
Female 1213,lavanya,18,Female,8000 1202,manisha,40,Female,51000 1206,laxmi,25,Female,35000 1207,bhavya,20,Female,15000 1208,reshma,19,Female,14000
Male 1211,Krishna,25,Male,26000 1212,Arshad,28,Male,20000 1201,gopal,45,Male,50000 1209,kranthi,22,Male,22000 1210,Satish,24,Male,25000 1203,khaleel,34,Male,30000 1204,prasanth,30,Male,31000 1205,kiran,20,Male,40000
This can be achieved by using MultipleOutputs in Hadoop.
The name itself gives an idea on what MultipleOutputs is - writes output data to multiple outputs
Lets see how to implement this
Driver Class
public class PartitionerDriver extends Configured implements Tool { /** * @param args * @throws Exception */ public static void main(String[] args) { // TODO Auto-generated method stub Configuration conf = new Configuration(); try { int res = ToolRunner.run(conf, new PartitionerDriver(), args); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public int run(String[] args) { // TODO Auto-generated method stub System.out.println("Partitioning File Based on Gender..........."); if (args.length != 3) { System.err .println("Usage:File Partitioner <input> <output> <delimiter> "); System.exit(0); } Configuration conf = new Configuration(); /* * Arguments */ String source = args[0]; String dest = args[1]; String delimiter = args[2]; //conf objects conf.set("delimiter", delimiter); FileSystem fs = null; try { fs = FileSystem.get(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } Path in = new Path(source); Path out = new Path(dest); Job job0 = null; try { job0 = new Job(conf, "Partition Records"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } job0.setJarByClass(PartitionerDriver.class); job0.setMapperClass(PartitionMapper.class); job0.setReducerClass(PartitionReducer.class); job0.setMapOutputKeyClass(Text.class); job0.setMapOutputValueClass(Text.class); job0.setOutputKeyClass(Text.class); job0.setOutputValueClass(Text.class); try { TextInputFormat.addInputPath(job0, in); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } /* * Delete output dir if exist */ try { if (fs.exists(out)) { fs.delete(out, true); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } TextOutputFormat.setOutputPath(job0, out); try { job0.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Successfully partitioned Data based on Gender!"); return 0; } }
Mapper Class
Mapper class gets each record and split them using delimiter.
Key will be Gender and value will be the record
protected void map(LongWritable key, Text value, Context context) { Configuration conf = context.getConfiguration(); String delim = conf.get("delimiter"); String line = value.toString(); String[] record = line.split(delim); keyEmit.set(record[3]); try { context.write(keyEmit, value); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
Reducer Class
MultipleOutputs<NullWritable, Text> mos; NullWritable out = NullWritable.get(); @Override protected void setup(Context context) { mos = new MultipleOutputs(context); } public void reduce(Text key, Iterable<Text> values, Context context) { for (Text value : values) { try { mos.write(out, value, key.toString()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override protected void cleanup(Context context) { try { mos.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
Here we define MultipleOutput as
MultipleOutputs<NullWritable, Text> mos;
Our file doesnot need a key.We are only interested to get the data. So key will be NullWritable and Value will be Text.
We have a setup() method where we initialize out MultipleOutput.
mos = new MultipleOutputs(context);
Lets see reduce()
As we know reducer aggregates values based on key. Key from mapper was Gender and we have 2 genders Male and female . so we will recieve 2 keys follwed by its value.
for (Text value : values) { mos.write(out, value, key.toString()); }
In MultipleOutputs we have 3 arguments
1. key
2. value and
3. File Name.
Here our key is NullWritable ,Value will be the record for each key. And we named our file with key.
Once you run this code. You can see an output as below
Two files
1. Female-r-00000
2. Male-r-00000
Lets see the contents
You can find the code here .
Happy Hadooping..............
This comment has been removed by the author.
ReplyDeleteHi Unmesha,
ReplyDeleteIs it possible to print both the mapper and reducer output in a single MR Job.
My requirement is I need to create two files, 2nd file is just an shrinked version of 1st file.
For Eg:
Output of File1 is:
A B C D E
A B C D E
A B C D E
F G H I J
F G H I J
F G H I J
Output of File2 is:
A C E
F H J
Currently I am doing Job Chaining. Output of job1 will be input of Job2
Job1 contains only Mapper Phase, Job2 Contains both MR Phases. Reducer in Job2 is just to remove the duplicate rows
Thanks
Abhinay
Whether you need to get both job1's output and job2's output.Is it like that or I missunderstood what you mentioned.Let me know if I am wrong
DeleteI will explain my problem clearly
DeleteCurrently What I am doing :
In my program I have two jobs. Job1(only mapper) Job2(mapper n reducer).
Input of Job1 is "sortcolumn/input.txt"
Output of Job1 is "sortcolumn/output/job1out/part-m-00000"
Input of Job2 is "sortcolumn/output/job1out/part-m-00000"
Output of Job2 is "sortcolumn/output/job2out/part-r-00000"
So the output of Job1 is input of Job2.
eg of input.txt:
d,c,b,e,a
d,c,b,e,a
g,f,h,j,i
h,j,f,i,g
Final output of part-r-00000
a,c,e
f,h,j
In job1 mapper in map( ) function
{
in[] = input line; //d,c,b,e,a
sort it and produce out[] // a,b,c,d,e
context.write(null,out[])
}
In job2 mapper in map()
{
in[]= input line; //a,b,c,d,e
context.write(in[0]+in[2]+in[4],1); //a,c,e
}
In job2 reducer in reduce()
{
context.write(key,null); //removes duplicate rows
}
As you can see job2 mappers takes input from job1 mapper, shrinks the columns and sends it reducer to remove duplicate rows.
What I need :
Now my doubt is can I use only one job (with mapper & reducer) like below
map()
{
in[] = input line;
sort in[] and produce out[];
write_into_hdfs(out[]);
send_to_reducer(out[0]+out[2]+out[4]);
}
reducer will be same as above.
To make my qsn simple. Can i write the mapper data in hdfs and also pass it to reducer ?
Sry for lengthy comment.
Yes you can do that in just 1 MR Job. And if you have reducer, intermediate data will be cleared(mapper output).It will not get written into HDFS.
Deletegithub link for the solution: [Link](https://github.com/studhadoop/Query-1)
DeleteYou can do something like this
Mapper
------
public class MyMapper extends Mapper {
NullWritable out = NullWritable.get();
Text valEmit = new Text();
public void map(LongWritable key, Text value, Context context)
{
Configuration conf = context.getConfiguration();
/*
* For eg lets use arraylist to store data
*/
ArrayList arrayList = new ArrayList();
String line = value.toString();
String[] parts = line.split(",");
for (int i = 0; i < parts.length; i++) {
arrayList.add(parts[i]);
}
/*
* Sort arrayList
*/
Collections.sort(arrayList);
/*
* iterate through arraylist to get desired result
*/
String emitdata = "";
for (int i = 0; i < arrayList.size(); i++) {
if(i%2 == 0){
if(i == 0){
emitdata = (String) arrayList.get(i);
}
else{
emitdata += "," + arrayList.get(i);
}
}
}
System.out.println(emitdata);
valEmit.set(emitdata);
try {
context.write(valEmit, out);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Reducer
-------
public class IdentityReducer extends Reducer {
NullWritable out = NullWritable.get();
public void reduce(Text key, Iterable values, Context context) {
try {
context.write(key, out);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Input
-----
d,c,b,e,a
d,c,b,e,a
g,f,h,j,i
h,j,f,i,g
Result
------
a,c,e
f,h,j
Again sry, i think i am confusing you.
DeleteMy problem is that I need 2 outputs for given input
output1
a,b,c,d,e
a,b,c,d,e
f,g,h,i,j
f,g,h,i,j
Output2
a,c,e
f,h,j
So I need to store mapper output and also reducer output.
Thanks,
Abhinay
Thanks a lot for your time... I am able to do it using multiple outputs..wrote mapper output using multipleoutpu and reducer output normally.
ReplyDeleteThanks,
Abhinay.
But... If u write mapper out using multipleoutput you will get the desired out.If u have more than one map task what will u do with that?
DeleteEach map task generates its output using multipleoutput, so if I have multiple map task i will get the output of all map tasks, that satisfies my requirement.
DeleteOk fine. So you need each mapp task output. fine :)
DeleteThis comment has been removed by the author.
DeleteFor latest and updated Cloudera certification dumps in PDF format contact us at completeexamcollection@gmail.com.
ReplyDeleteRefer our blog for more details http://completeexamcollection.blogspot.in/2015/04/cloudera-hadoop-certification-dumps.html
Really a good piece of knowledge on Big Data and Hadoop. Thanks for such a good post. I would like to recommend one more resource NPN Training which helps in getting more knowledge on Hadoop. The best part of NPN Training is they provide complete Hands-on classes.
ReplyDeleteFor More feedback visit
http://npntraining.com/testimonial.php
Very helpful information. I made my alpinistic career by thissoftware training institute in bangalore. So I suggest you also to have a look here. Thankyou
ReplyDeleteblog is nice and much interesting which engaged me more.Spend a worthful time.keep updating more.
ReplyDeleteBiotech Internships | internships for cse students | web designing course in chennai | it internships | electrical engineering internships | internship for bcom students | python training in chennai | web development internship | internship for bba students | internship for 1st year engineering students