Tuesday 22 April 2014

Chaining Jobs in Hadoop MapReduce


There are cases where we need to write more than one MapReduce Job.
Map1--Reduce1--Map2--Reduce2
How do you manage the jobs so they are executed in order? There are several approaches, Here is an approach to easily chain jobs together by writing multiple driver methods, one for each job:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author Unmesha SreeVeni U.B
 * 
 */
public class ChainJobs extends Configured implements Tool {

 private static final String OUTPUT_PATH = "intermediate_output";

 @Override
 public int run(String[] args) throws Exception {
  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

  /*
   * Job 2
   */
  
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;
 }

 /**
  * Method Name: main Return type: none Purpose:Read the arguments from
  * command line and run the Job till completion
  * 
  */
 public static void main(String[] args) throws Exception {
  // TODO Auto-generated method stub
  if (args.length != 2) {
   System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
   System.exit(0);
  }
  ToolRunner.run(new Configuration(), new ChainJobs(), args);
 }
}

The above code has 2 jobs named job1 and job2
private static final String OUTPUT_PATH = "intermediate_output";
String "OUTPUT_PATH" is used to write the output for first job.
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
So in first job our input will be args[0] and output will be new Path(OUTPUT_PATH).

First Job Configuration


  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs1.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

Once the first job has executed successfully  "OUTPUT_PATH" is served as the input to second job and the output of job2 is written to args[1].
TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
TextOutputFormat.setOutputPath(job2, new Path(args[1]));

Second Job Configuration

  /*
   * Job 2
   */
 
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs1.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;

Happy Hadooping . . .

70 comments:

  1. Where is the code for ChainJobs1.java and ChainJobs2.java?

    ReplyDelete
    Replies
    1. There is no ChainJobs2.java. Apologies for confusing and Thanks for pointing out. I updated the post.

      Delete
  2. Sorry I am new at Hadoop. Could you please give some examples on how to read the file from map/ reduce function? Do you just do fs.open(), or is there any build in magic from TextInputFormat.addInputPath()?
    Thanks!

    ReplyDelete
    Replies
    1. You can read files in MapReduce job using TextInputFormat. Supply your file in TextInputFormat and read them in map function. You can also read files from Distributed cache in setup function.
      Let me know if you have further doubts.

      Delete
    2. Thank you very much!

      Delete
  3. Thank you very much for such a helpful post..
    Keep posting such stuffs in Hadoop.
    Nishit

    ReplyDelete
  4. The second job doesnt seem to run for me.. THe mapper setup runs but not the map function within the second mapper. Is it because of format issues. Coz otherwise there doesnt seem to be anything wrong in my program

    ReplyDelete
    Replies
    1. could u please share ur code? Or else you can ping me in unmeshabiju@gmail.com

      Delete
  5. Hi,

    I am running a hadoop chainjobs. While running it with low data sets(i.e. 10-20 files) it is working perfectly but while running with more than 30 files after the first job the second job gets an error connection refuse. Already tried 2 times something like that. Can you please let me know why I am facing this issue. I have also gone with adddepending job but with that the output path for the job2 is not getting validated.

    Thanks,
    Shuvankar

    ReplyDelete
  6. Hi unmesha sreeveni, great post! you saved me! :D
    I found some errors, like fileNotFoundException. and i solved it adding "/part-r-00000" (the name of the outputfile)

    I my application i am trying to do the GIM-V algorithm that basicly is multiply a matrix by a vector, and again by the vector result and again and so on.

    finally i did a cycle for all the new jobs, something like this, check.

    Configuration conf = getConf();
    Job job = new Job(conf, "matrix-multiply-vector");
    // See Amareshwari Sri Ramadasu's comment in this thread...
    // http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-td722187.html
    // you need to do job.getConfiguration() instead of conf.
    DistributedCache.addCacheFile(new Path(args[1]).toUri(),
    job.getConfiguration());
    job.setJarByClass(MatrixMultiplyVector.class);

    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);

    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(DoubleWritable.class);

    job.setInputFormatClass(TextInputFormat.class);
    //setoutputFormat...

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[2]));

    boolean succ = job.waitForCompletion(true);
    int nroRepeticiones =Integer.parseInt(args[3]);
    String salida = args[2];
    String nuevaSalida=salida;
    for(int i=1;i<nroRepeticiones;i++){
    Configuration conf2 = new Configuration();
    Job job2 = new Job(conf2, "ENCADENADOJOB");
    // See Amareshwari Sri Ramadasu's comment in this thread...
    // http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-td722187.html
    // you need to do job.getConfiguration() instead of conf.
    DistributedCache.addCacheFile(new Path(nuevaSalida+"/part-r-00000").toUri(),
    job2.getConfiguration());
    job2.setJarByClass(MatrixMultiplyVector.class);

    job2.setMapperClass(Mapper1.class);
    job2.setReducerClass(Reducer1.class);

    job2.setMapOutputKeyClass(LongWritable.class);
    job2.setMapOutputValueClass(DoubleWritable.class);

    job2.setInputFormatClass(TextInputFormat.class);
    //setoutputFormat...
    nuevaSalida = salida+"-"+String.valueOf(i);

    FileInputFormat.addInputPath(job2, new Path(args[0]));
    FileOutputFormat.setOutputPath(job2, new Path(nuevaSalida));
    System.out.println("-----iteracion:"+i);
    succ = job2.waitForCompletion(true);
    }
    return 5;

    Thank you again :D

    ReplyDelete
    Replies
    1. Thanks.
      Yes for distributed cache you need to mention the part file aswell, but if you are writing a MR job you need to only specify the folder.

      Delete
  7. Nice work Unmesha. I will try out the code, meanwhile I have few question.
    1. As the OUTPUT_PATH is intermediate output, where does it store, HDFS or Local Disk (Like mappers).
    2. Does it persist or gets deleted after job finishes. If it persists can we see the file contents (will it be serialized)

    ReplyDelete
    Replies
    1. The intermediate output is written into HDFS only , that is how you can use the output path of the first job as the input for the next

      Delete
    2. Following the above question, is it necessary to store the results in hdfs, is there any way we redirect it to next mapper without wasting resources on creating new file

      Delete
  8. Thanks for the blog its really helpful.The chaining job is very interesting one.Thanks for the nice blog.Besant Technologies Reviews | Besant Technologies Reviews

    ReplyDelete
  9. For latest and updated Cloudera certification dumps in PDF format contact us at completeexamcollection@gmail.com.
    Refer our blog for more details http://completeexamcollection.blogspot.in/2015/04/cloudera-hadoop-certification-dumps.html

    ReplyDelete
  10. Nice example. But if I need to chain n jobs where n is not predefined, then what should be done? Let's say for an iterative algorithm that terminates only when certain conditions are met.

    ReplyDelete
  11. I am using the same example but when it is executing second job. It is saying input file not found. Also output file not getting created after first job executed successfully.

    xception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://localhost:54310/user/output1232
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:321)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:385)
    at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:115)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:597)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:614)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:492)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1314)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.run(JoinClickImpressionDetailJob.java:418)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.main(JoinClickImpressionDetailJob.java:422)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

    ReplyDelete
  12. Hi Unmesha sreeveni,

    Thanks a lot for detailed explanation...Very Helpful.
    I am a new beginner in Hadoop. I dont know why these errors in DriverCode.
    could u please advice me.
    Driver code is sent to this mail. unmeshabiju@gmail.com

    ReplyDelete

  13. In Hadoop, MapReduce is a calculation that decomposes large manipulation jobs into individual tasks that can be executed in parallel cross a cluster of servers. The results of tasks can be joined together to compute final results.
    Mapreduce program example
    Hadoop fs command using java api

    ReplyDelete
  14. Hello. I am trying to create a chain joib in hadoop. The algorithm I want to create requests map2 to get as an input the output from the map1 . The Job1 have both map and reduce phase. Is there any possible way something like this to happen?
    Thanks in advance

    ReplyDelete
  15. Learned a lot of new things from your post , Thanks for sharing


    Java Online Training Hyderabad

    ReplyDelete
  16. I appreciate your efforts because it conveys the message of what you are trying to say. It's a great skill to make even the person who doesn't know about the subject could able to understand the subject . Your blogs are understandable and also elaborately described. I hope to read more and more interesting articles from your blog. All the best.
    rpa training in bangalore
    rpa training in chennai
    rpa training in pune
    best rpa training in bangalore

    ReplyDelete
  17. Needed to compose you a very little word to thank you yet again regarding the nice suggestions you’ve contributed here.
    python training in chennai
    python course institute in chennai

    ReplyDelete
  18. Надеюсь, удача придет к вам. Желаю тебе всегда счастливой!


    Lều xông hơi khô

    Túi xông hơi cá nhân

    Lều xông hơi hồng ngoại

    Mua lều xông hơi

    ReplyDelete
  19. Have you ever thought about including a little bit more than just your articles? I mean, what you say is important and everything. But just imagine if you added some great visuals or videos to give your posts more, "pop"! Your content is excellent but with pics and video clips, this website could definitely be one of the very best in its field. Superb blog!
    waterjet cutting edmonton

    ReplyDelete
  20. Do you guys know that the tnpsc group 4 online test hall tickets will be released any soon now and you can download them.
    Visit our website SarkariResultExams and check the hall ticket section to get more information about the hall tickets of the upcoming exams.
    Thanks.

    ReplyDelete
  21. Visit for AWS training in Bangalore:- AWS training in Bangalore

    ReplyDelete
  22. Greetings I am so excited I found your website, I really found you by error, while I was browsing on Google for something else, Nonetheless I am here now and would just like to say thanks a lot for a tremendous post and a all round entertaining blog (I also love the theme/design), I don’t have time to read it all at the moment but I have book-marked it and also added in your RSS feeds, so when I have time I will be back to read much more, Please do keep up the great job.
    milling services edmonton

    ReplyDelete
  23. I finally found great post here.I will get back here. I just added your blog to my bookmark sites. thanks.Quality posts is the crucial to invite the visitors to visit the web page, that's what this web page is providing.
    data science course Mumbai
    data analytics courses Mumbai
    data science interview questions

    ReplyDelete
  24. Really amazing content, thanks for sharing with us and keep updating! This website article is really excellent and unique. I will visit your site again. You can see the Bangladesh Education, Events, JSC, PSC, SSC, HSC, Honours, nu, Result, routine and Job circular Pureinfobd

    ReplyDelete
  25. wonderful article. Very interesting to read this article.I would like to thank you for the efforts you had made for writing this awesome article. This article resolved my all queries.
    data science interview questions

    ReplyDelete
  26. wow, great, I was wondering how to cure acne naturally. and found your site by google, learned a lot, now i’m a bit clear. I’ve bookmark your site and also add rss. keep us updated.

    Data Science Training

    ReplyDelete
  27. Very nice article post,Thank you for sharing this awesome blog.
    keep updating more big data hadoop tutorials.

    Big Data and Hadoop Training

    ReplyDelete
  28. It is perfect time to make some plans for the future and it is time to be happy. I've read this post and if I could I desire to suggest you some interesting things or suggestions. Perhaps you could write next articles referring to this article. I want to read more things about it!
    data scientist training and placement

    ReplyDelete
  29. Dear Readers,
    In the very first article of JAIIB or DB&F, we discussed about the exams, what are they, who conducts them and why it is important.
    Now, in this article, we will be discussing about the Exam Pattern, Eligibility and Schedule of the JAIIB Examination.
    Both JAIIB and DB&F are conducted two times in a year – One in around month of May and second time in around month of November on the three consecutive Sundays of the month.
    IIBF JAIIB Exam Study Material All About JAIIB: Exam Pattern, Eligibility and Schedule

    ReplyDelete
  30. i am glad to discover this page : i have to thank you for the time i spent on this especially great reading !! i really liked each part and also bookmarked you for new information on your site.
    artificial intelligence course in nashik

    ReplyDelete
  31. Amazingly by and large very interesting post. I was looking for such an information and thoroughly enjoyed examining this one. Keep posting. An obligation of appreciation is all together for sharing.data science colleges in bangalore

    ReplyDelete
  32. Extremely overall quite fascinating post. I was searching for this sort of data and delighted in perusing this one. Continue posting. A debt of gratitude is in order for sharing.business analytics course in warangal

    ReplyDelete
  33. Why is Trading Directory better than the rest? Our team has done an extensive research on online brokers and found that, unfortunately, many of them are not transparent with their data. This inspired us to make an honest and transparent comparison site that helps consumers find the right broker.

    ReplyDelete
  34. Extremely overall quite fascinating post. I was searching for this sort of data and delighted in perusing this one. Continue posting. A debt of gratitude is in order for sharing.data science training in warangal

    ReplyDelete
  35. Your music is amazing. You have some very talented artists. I wish you the best of success. data science training in kanpur

    ReplyDelete
  36. Bank Promotion Exams 2022 are conducted by the Institute of Banking Personnel Selection or IBPS (a govt. owned personnel recruitment agency). IIBF Bank Promotion Exams 2022 Study Material and pdf notes
    https://learningsessions.in/bank-promotion-exam-2022/

    ReplyDelete
  37. Such a helpful article. Interesting to peruse this article.I might want to thank you for the endeavors you had made for composing this wonderful article.
    best data science training in hyderabad

    ReplyDelete
  38. Read our honest broker review of CMC Markets Review , one of the best online trading companies. Find out about the advantages of using a broker like CMC Markets Review, and learn more about the stock market and the best ways to invest in it. Read more here.

    ReplyDelete
  39. Our list of the Best MT4 Forex Brokers In UAE is compiled solely of trusted, regulated and reputable forex brokers. We have selected well-known brokers that offer great customer service, low fees, an easy-to-use online platform, and education for new investors.

    ReplyDelete
  40. Would You Like To Access Your Top Forex Brokers In Malaysia Account And Manage Your Trades? This Is The Place For You. Here You Can Easily Manage Your Investments, Deposit Or Withdraw Funds. Read More Here.

    ReplyDelete
  41. 360DigiTMG, the top-rated organisation among the most prestigious industries around the world, is an educational destination for those looking to pursue their dreams around the globe. The company is changing careers of many people through constant improvement, 360DigiTMG provides an outstanding learning experience and distinguishes itself from the pack. 360DigiTMG is a prominent global presence by offering world-class training. Its main office is in India and subsidiaries across Malaysia, USA, East Asia, Australia, Uk, Netherlands, and the Middle East.

    ReplyDelete
  42. I was surfing net and fortunately came across this site and found very interesting stuff here. Its really fun to read. I enjoyed a lot. Thanks for sharing this wonderful information. business analytics course in mysore

    ReplyDelete
  43. Data development is also a necessary step when learning data science. Raw data is used as input, and then personal recommendations of a user are generated about a particular product.
    data science training in gorakhpur

    ReplyDelete
  44. This comment has been removed by the author.

    ReplyDelete
  45. Feeding yоur dоg а рremium quаlity diet tо keeр them heаlthy is оne оf the mаster things yоu саn dо аs а dоg оwner.
    buy Dog food online ONLINE IN INDIA

    ReplyDelete
  46. Braço do terceiro ponto do trator tractor machine tools and aggriculture machine supertractor machine-tools Machine aggriculture machine
    buy Braço do terceiro ponto do trator

    ReplyDelete
  47. Amazingly, a post that is generally quite fascinating. I had been searching for such material and had a great time reading this one. Continue to post.
    Data Analytics Courses in Agra

    ReplyDelete
  48. Hello Blogger,
    Thank you for sharing this concise and informative guide on chaining MapReduce jobs in Hadoop. Your clear code and explanations make the process easier to understand and implement. It is an interesting and informative read.
    Is iim skills fake?

    ReplyDelete
  49. This article offers a clear and concise overview of the process of chaining jobs in Hadoop MapReduce
    Digital Marketing Courses in Hamburg

    ReplyDelete
  50. This is a great and engaging article. I had been seeking this type of information and found it enjoyable to read. Please continue to publish more. Thanks for sharing.
    daa Analytics courses in leeds

    ReplyDelete
  51. Chaining jobs in Hadoop MapReduce is a crucial technique for optimizing data processing workflows, allowing for the seamless execution of multiple tasks in a coordinated manner. In the vibrant city of London, Data Analytics courses offer the opportunity to master such advanced techniques, equipping professionals with the skills to navigate the dynamic field of big data analytics. Please also Digital Marketing Courses in London .

    ReplyDelete
  52. A well explained and structured article on how to chain jobs in Hadoop MapReduce. Thanks for providing informative blog.
    Digital Marketing Courses in Italy

    ReplyDelete
  53. Thank you for providing detailed explanation on Chaining Jobs in Hadoop MapReduce
    .
    Adwords marketing

    ReplyDelete
  54. Insightful guide on chaining Hadoop MapReduce jobs. Clear code example and explanations. Grateful for sharing this valuable resource! Thanks.

    How Digital marketing is changing business

    ReplyDelete
  55. Looking forward to diving into more of your well-crafted articles!
    Investment banking skills and responsibilities

    ReplyDelete
  56. Hiring all Java engineers in the Netherlands who are seeking a demanding position! This position provides a rare fusion of technical know-how and innovative problem-solving techniques.
    JAVA jobs in netherlands

    ReplyDelete
  57. Thanks for some really useful code. This was just what I needed to finish my Hadoop assignment.
    Investment banking analyst jobs

    ReplyDelete