Monday, 8 September 2014

How To Set Counters In Hadoop MapReduce

Counters are a useful channel for gathering statistics about the job: for quality control or for application level-statistics.Lets see an example where Counters count the no of keys processed in reducer.


3 key points to set

1. Define counter in Driver class

public class CounterDriver extends Configured implements Tool{
 long c = 0;
 static enum UpdateCount{
  CNT
 }
 public static void main(String[] args) throws Exception{
     Configuration conf = new Configuration();
     int res = ToolRunner.run(conf, new CounterDriver(), args);
     System.exit(res);
  }
 public int run(String[] args) throws Exception {

2. Increment or set counter in Reducer

public class CntReducer extends Reducer<IntWritable, Text, IntWritable, Text>{
 public void reduce(IntWritable key,Iterable<Text> values,Context context)  {
      //do something
      context.getCounter(UpdateCount.CNT).increment(1);
 }
}

3. Get counter in Driver class 

public class CounterDriver extends Configured implements Tool{
 long c = 0;
 static enum UpdateCount{
  CNT
 }
 public static void main(String[] args) throws Exception{
     Configuration conf = new Configuration();
     int res = ToolRunner.run(conf, new CounterDriver(), args);
     System.exit(res);
  }
 public int run(String[] args) throws Exception {
 .
 .
 .
 job.setInputFormatClass(TextInputFormat.class);
 job.setOutputFormatClass(TextOutputFormat.class);
 FileInputFormat.setInputPaths(job,in );
 FileOutputFormat.setOutputPath(job, out);
 job.waitForCompletion(true);
 c = job.getCounters().findCounter(UpdateCount.CNT).getValue();
 }
}

Full code :  GitHub

You will be able to see the counters in console also.






6 comments:

  1. Hi Unmesha
    Abhinay again here.

    we count the no.of bad records received by our map() and write it into file.

    We do this in 10 mr jobs.

    so every jobs writes one bad records file into hdfs and also every job has it own counter.

    finally all these counters and bad records of 10 jobs has to be written into a XML file.

    For that I have created a static final class and call it in every MR job (total 10 jobs) like
    LoggingCounter.LogMessage(arg1,arg2)

    At the end of each job i call the function in that class, so this function is called 10 times

    Below is the code:

    public final class LoggingCounter {


    public static int LogMessage (String Record, String Component ) throws IOException, ParserConfigurationException, TransformerException
    {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);

    Path inPath = new Path("hdfs://nameservice1/user/abhime01/haadoop/Rules/AccumulationRule/op/BadMapper-m-00000");
    Path outPath = new Path("hdfs://nameservice1/user/abhime01/logging.xml");

    if (!fs.exists(inPath))
    {
    System.err.println("Input Path " + inPath.toString() + " does not exist.");
    return 1;
    }

    DocumentBuilderFactory documentBuilderFactory =DocumentBuilderFactory.newInstance();
    DocumentBuilder documentBuilder =documentBuilderFactory.newDocumentBuilder();
    Document document; document = documentBuilder.newDocument();



    FSDataOutputStream fos;
    if (!fs.exists(outPath))
    {
    fos = fs.create(outPath);

    }
    else
    fos= fs.append(outPath);

    final String root = "TransactionLog";
    final String attribute = "Attributes";
    final String elementTS ="TS";
    final String elementTSD ="TSD";



    Element rootElement = document.createElement(root); //
    document.appendChild(rootElement);

    Element subrootElement = document.createElement(attribute); //
    rootElement.appendChild(subrootElement);

    Element ts = document.createElement(elementTS); //
    ts.appendChild(document.createTextNode(Component));
    subrootElement.appendChild(ts);

    Element tsd = document.createElement(elementTSD); //
    BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(inPath)));
    try
    {
    String writeBadRcrd=null;
    String badRcrdInputline = br.readLine();
    while (badRcrdInputline != null)
    {
    writeBadRcrd = badRcrdInputline.replaceAll(";","|");
    tsd.appendChild(document.createTextNode(writeBadRcrd));
    badRcrdInputline = br.readLine(); //Read the next line to avoid infinite loop

    }


    }
    catch(Exception e)
    {
    }
    finally
    {
    br.close();
    }
    subrootElement.appendChild(tsd);

    TransformerFactory transformerFactory = TransformerFactory.newInstance();
    Transformer transformer = transformerFactory.newTransformer();
    DOMSource source = new DOMSource(document);
    StreamResult result = new StreamResult(new StringWriter()); //Read the generated XML and write into HDFS
    transformer.setOutputProperty(OutputKeys.INDENT, "yes");
    transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "5");
    transformer.transform(source, result);

    try
    {
    String xmlString = result.getWriter().toString();
    fos.writeBytes(xmlString+"\n");
    }
    catch(Exception e)
    {
    }
    finally
    {
    fos.close();
    }
    return 0;
    }
    }

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. This comment has been removed by the author.

    ReplyDelete
  4. But the prblm is everytime the whole individual xml file is getting appended. Instead I need only the nodes to be appended

    Problematic output:
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <TransactionLog>
    <Attributes>
    <TS>AccumulationRule</TS>
    <TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
    </Attributes>
    </TransactionLog>

    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <TransactionLog>
    <Attributes>
    <TS>ApplyMathRule</TS>
    <TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
    </Attributes>
    </TransactionLog>



    Desired output is :
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <TransactionLog>
    <Attributes>
    <TS>AccumulationRule</TS>
    <TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
    </Attributes>
    Attributes>
    <TS>ApplyMathRule</TS>
    <TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
    </Attributes>
    </TransactionLog>

    ReplyDelete
  5. For latest and updated Cloudera certification dumps in PDF format contact us at completeexamcollection@gmail.com.
    Refer our blog for more details http://completeexamcollection.blogspot.in/2015/04/cloudera-hadoop-certification-dumps.html

    ReplyDelete
  6. The Mmatf Stock Overview offers real time stock price updates. All you need to do is open the application and you can view Streaming stock prices of your favorite stocks.

    ReplyDelete