Wednesday 9 December 2015

Partitioning Data Using Hadoop MultipleOutputs

There may be cases where we need to partition our data based on certion condition.
Say for example, Consider this Employee data
EmpId,EmpName,Age,Gender,Salary
1201,gopal,45,Male,50000
1202,manisha,40,Female,51000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1213,lavanya,18,Female,8000
Lets assume one condition.
We need to seperate above data based on Gender (there can be more scenarios)
Expected outcome will be like this 
Female

1213,lavanya,18,Female,8000
1202,manisha,40,Female,51000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
Male

1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1201,gopal,45,Male,50000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
This can be achieved by using MultipleOutputs in Hadoop.
The name itself gives an idea on what MultipleOutputs is - writes output data to multiple outputs

Lets see how to implement this
Driver Class
public class PartitionerDriver extends Configured implements Tool {

 /**
  * @param args
  * @throws Exception
  */
 public static void main(String[] args) {
  // TODO Auto-generated method stub
  Configuration conf = new Configuration();
  try {
   int res = ToolRunner.run(conf, new PartitionerDriver(), args);
  } catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

 public int run(String[] args) {
  // TODO Auto-generated method stub
  System.out.println("Partitioning File Based on Gender...........");
  if (args.length != 3) {
   System.err
     .println("Usage:File Partitioner <input> <output> <delimiter> ");
   System.exit(0);
  }
  Configuration conf = new Configuration();
  /*
   * Arguments
   */
  String source = args[0];
  String dest = args[1];
  String delimiter = args[2];
  
  //conf objects
  conf.set("delimiter", delimiter);
  
  FileSystem fs = null;
  try {
   fs = FileSystem.get(conf);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  Path in = new Path(source);
  Path out = new Path(dest);
  
  Job job0 = null;
  try {
   job0 = new Job(conf, "Partition Records");
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  job0.setJarByClass(PartitionerDriver.class);
  job0.setMapperClass(PartitionMapper.class);
  job0.setReducerClass(PartitionReducer.class);
  job0.setMapOutputKeyClass(Text.class);
  job0.setMapOutputValueClass(Text.class);
  job0.setOutputKeyClass(Text.class);
  job0.setOutputValueClass(Text.class);
  try {
   TextInputFormat.addInputPath(job0, in);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*
   * Delete output dir if exist
   */
  try {
   if (fs.exists(out)) {
    fs.delete(out, true);
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  TextOutputFormat.setOutputPath(job0, out);
  try {
   job0.waitForCompletion(true);
  } catch (ClassNotFoundException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  System.out.println("Successfully partitioned Data based on Gender!");
  return 0;
 }
}


Mapper Class
Mapper class gets each record and split them using delimiter.
Key will be Gender and value will be the record

protected void map(LongWritable key, Text value, Context context) {

  Configuration conf = context.getConfiguration();
  String delim = conf.get("delimiter");
  String line = value.toString();
  
  String[] record = line.split(delim);
  keyEmit.set(record[3]);
   try {
    context.write(keyEmit, value);
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
 }

Reducer Class
MultipleOutputs<NullWritable, Text> mos;
 NullWritable out = NullWritable.get();

 @Override
 protected void setup(Context context) {
  mos = new MultipleOutputs(context);
 }

 public void reduce(Text key, Iterable<Text> values, Context context) {
  for (Text value : values) {
   try {
    mos.write(out, value, key.toString());
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }

 @Override
 protected void cleanup(Context context) {
  try {
   mos.close();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

Here we define MultipleOutput as
MultipleOutputs<NullWritable, Text> mos;
Our file doesnot need a key.We are only interested to get the data. So key will be NullWritable and Value will be Text.
We have a setup() method where we initialize out MultipleOutput.
mos = new MultipleOutputs(context);
Lets see reduce()
As we know reducer aggregates values based on key. Key from mapper was Gender and we have 2 genders Male and female . so we will recieve 2 keys follwed by its value.
for (Text value : values) {
 mos.write(out, value, key.toString());
}
In MultipleOutputs we have 3 arguments
1. key
2. value and 
3. File Name.
Here our key is NullWritable ,Value will be the record for each key. And we named our file with key.

Once you run this code. You can see an output as below


Two files 
1. Female-r-00000 
2. Male-r-00000 
Lets see the contents



You can find the code here . 

Happy Hadooping..............

16 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Hi Unmesha,

    Is it possible to print both the mapper and reducer output in a single MR Job.
    My requirement is I need to create two files, 2nd file is just an shrinked version of 1st file.
    For Eg:
    Output of File1 is:
    A B C D E
    A B C D E
    A B C D E
    F G H I J
    F G H I J
    F G H I J

    Output of File2 is:
    A C E
    F H J

    Currently I am doing Job Chaining. Output of job1 will be input of Job2

    Job1 contains only Mapper Phase, Job2 Contains both MR Phases. Reducer in Job2 is just to remove the duplicate rows

    Thanks
    Abhinay

    ReplyDelete
    Replies
    1. Whether you need to get both job1's output and job2's output.Is it like that or I missunderstood what you mentioned.Let me know if I am wrong

      Delete
    2. I will explain my problem clearly

      Currently What I am doing :
      In my program I have two jobs. Job1(only mapper) Job2(mapper n reducer).

      Input of Job1 is "sortcolumn/input.txt"
      Output of Job1 is "sortcolumn/output/job1out/part-m-00000"
      Input of Job2 is "sortcolumn/output/job1out/part-m-00000"
      Output of Job2 is "sortcolumn/output/job2out/part-r-00000"

      So the output of Job1 is input of Job2.

      eg of input.txt:
      d,c,b,e,a
      d,c,b,e,a
      g,f,h,j,i
      h,j,f,i,g

      Final output of part-r-00000
      a,c,e
      f,h,j

      In job1 mapper in map( ) function
      {
      in[] = input line; //d,c,b,e,a
      sort it and produce out[] // a,b,c,d,e
      context.write(null,out[])
      }

      In job2 mapper in map()
      {
      in[]= input line; //a,b,c,d,e
      context.write(in[0]+in[2]+in[4],1); //a,c,e
      }

      In job2 reducer in reduce()
      {
      context.write(key,null); //removes duplicate rows
      }

      As you can see job2 mappers takes input from job1 mapper, shrinks the columns and sends it reducer to remove duplicate rows.

      What I need :
      Now my doubt is can I use only one job (with mapper & reducer) like below

      map()
      {
      in[] = input line;
      sort in[] and produce out[];
      write_into_hdfs(out[]);
      send_to_reducer(out[0]+out[2]+out[4]);
      }

      reducer will be same as above.

      To make my qsn simple. Can i write the mapper data in hdfs and also pass it to reducer ?

      Sry for lengthy comment.

      Delete
    3. Yes you can do that in just 1 MR Job. And if you have reducer, intermediate data will be cleared(mapper output).It will not get written into HDFS.

      Delete
    4. github link for the solution: [Link](https://github.com/studhadoop/Query-1)

      You can do something like this

      Mapper
      ------

      public class MyMapper extends Mapper {

      NullWritable out = NullWritable.get();
      Text valEmit = new Text();

      public void map(LongWritable key, Text value, Context context)
      {
      Configuration conf = context.getConfiguration();
      /*
      * For eg lets use arraylist to store data
      */
      ArrayList arrayList = new ArrayList();
      String line = value.toString();
      String[] parts = line.split(",");
      for (int i = 0; i < parts.length; i++) {
      arrayList.add(parts[i]);
      }
      /*
      * Sort arrayList
      */
      Collections.sort(arrayList);
      /*
      * iterate through arraylist to get desired result
      */
      String emitdata = "";
      for (int i = 0; i < arrayList.size(); i++) {
      if(i%2 == 0){
      if(i == 0){
      emitdata = (String) arrayList.get(i);
      }
      else{
      emitdata += "," + arrayList.get(i);
      }
      }
      }
      System.out.println(emitdata);
      valEmit.set(emitdata);
      try {
      context.write(valEmit, out);
      } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
      } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
      }

      }


      }

      Reducer
      -------
      public class IdentityReducer extends Reducer {
      NullWritable out = NullWritable.get();

      public void reduce(Text key, Iterable values, Context context) {

      try {
      context.write(key, out);
      } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
      } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
      }
      }
      }

      Input
      -----
      d,c,b,e,a
      d,c,b,e,a
      g,f,h,j,i
      h,j,f,i,g

      Result
      ------
      a,c,e
      f,h,j

      Delete
    5. Again sry, i think i am confusing you.
      My problem is that I need 2 outputs for given input
      output1
      a,b,c,d,e
      a,b,c,d,e
      f,g,h,i,j
      f,g,h,i,j

      Output2
      a,c,e
      f,h,j

      So I need to store mapper output and also reducer output.

      Thanks,
      Abhinay

      Delete
  3. Thanks a lot for your time... I am able to do it using multiple outputs..wrote mapper output using multipleoutpu and reducer output normally.

    Thanks,
    Abhinay.

    ReplyDelete
    Replies
    1. But... If u write mapper out using multipleoutput you will get the desired out.If u have more than one map task what will u do with that?

      Delete
    2. Each map task generates its output using multipleoutput, so if I have multiple map task i will get the output of all map tasks, that satisfies my requirement.

      Delete
    3. Ok fine. So you need each mapp task output. fine :)

      Delete
    4. This comment has been removed by the author.

      Delete
  4. For latest and updated Cloudera certification dumps in PDF format contact us at completeexamcollection@gmail.com.
    Refer our blog for more details http://completeexamcollection.blogspot.in/2015/04/cloudera-hadoop-certification-dumps.html

    ReplyDelete
  5. Really a good piece of knowledge on Big Data and Hadoop. Thanks for such a good post. I would like to recommend one more resource NPN Training which helps in getting more knowledge on Hadoop. The best part of NPN Training is they provide complete Hands-on classes.

    For More feedback visit
    http://npntraining.com/testimonial.php

    ReplyDelete
  6. Very helpful information. I made my alpinistic career by thissoftware training institute in bangalore. So I suggest you also to have a look here. Thankyou

    ReplyDelete