Counters are a useful channel for gathering statistics about the job: for quality control or for application level-statistics.Lets see an example where Counters count the no of keys processed in reducer.
3 key points to set
1. Define counter in Driver class
public class CounterDriver extends Configured implements Tool{
long c = 0;
static enum UpdateCount{
CNT
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
int res = ToolRunner.run(conf, new CounterDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
2. Increment or set counter in Reducer
public class CntReducer extends Reducer<IntWritable, Text, IntWritable, Text>{
public void reduce(IntWritable key,Iterable<Text> values,Context context) {
//do something
context.getCounter(UpdateCount.CNT).increment(1);
}
}
3. Get counter in Driver class
public class CounterDriver extends Configured implements Tool{
long c = 0;
static enum UpdateCount{
CNT
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
int res = ToolRunner.run(conf, new CounterDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
.
.
.
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job,in );
FileOutputFormat.setOutputPath(job, out);
job.waitForCompletion(true);
c = job.getCounters().findCounter(UpdateCount.CNT).getValue();
}
}
Hi Unmesha
ReplyDeleteAbhinay again here.
we count the no.of bad records received by our map() and write it into file.
We do this in 10 mr jobs.
so every jobs writes one bad records file into hdfs and also every job has it own counter.
finally all these counters and bad records of 10 jobs has to be written into a XML file.
For that I have created a static final class and call it in every MR job (total 10 jobs) like
LoggingCounter.LogMessage(arg1,arg2)
At the end of each job i call the function in that class, so this function is called 10 times
Below is the code:
public final class LoggingCounter {
public static int LogMessage (String Record, String Component ) throws IOException, ParserConfigurationException, TransformerException
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path("hdfs://nameservice1/user/abhime01/haadoop/Rules/AccumulationRule/op/BadMapper-m-00000");
Path outPath = new Path("hdfs://nameservice1/user/abhime01/logging.xml");
if (!fs.exists(inPath))
{
System.err.println("Input Path " + inPath.toString() + " does not exist.");
return 1;
}
DocumentBuilderFactory documentBuilderFactory =DocumentBuilderFactory.newInstance();
DocumentBuilder documentBuilder =documentBuilderFactory.newDocumentBuilder();
Document document; document = documentBuilder.newDocument();
FSDataOutputStream fos;
if (!fs.exists(outPath))
{
fos = fs.create(outPath);
}
else
fos= fs.append(outPath);
final String root = "TransactionLog";
final String attribute = "Attributes";
final String elementTS ="TS";
final String elementTSD ="TSD";
Element rootElement = document.createElement(root); //
document.appendChild(rootElement);
Element subrootElement = document.createElement(attribute); //
rootElement.appendChild(subrootElement);
Element ts = document.createElement(elementTS); //
ts.appendChild(document.createTextNode(Component));
subrootElement.appendChild(ts);
Element tsd = document.createElement(elementTSD); //
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(inPath)));
try
{
String writeBadRcrd=null;
String badRcrdInputline = br.readLine();
while (badRcrdInputline != null)
{
writeBadRcrd = badRcrdInputline.replaceAll(";","|");
tsd.appendChild(document.createTextNode(writeBadRcrd));
badRcrdInputline = br.readLine(); //Read the next line to avoid infinite loop
}
}
catch(Exception e)
{
}
finally
{
br.close();
}
subrootElement.appendChild(tsd);
TransformerFactory transformerFactory = TransformerFactory.newInstance();
Transformer transformer = transformerFactory.newTransformer();
DOMSource source = new DOMSource(document);
StreamResult result = new StreamResult(new StringWriter()); //Read the generated XML and write into HDFS
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "5");
transformer.transform(source, result);
try
{
String xmlString = result.getWriter().toString();
fos.writeBytes(xmlString+"\n");
}
catch(Exception e)
{
}
finally
{
fos.close();
}
return 0;
}
}
This comment has been removed by the author.
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteBut the prblm is everytime the whole individual xml file is getting appended. Instead I need only the nodes to be appended
ReplyDeleteProblematic output:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<TransactionLog>
<Attributes>
<TS>AccumulationRule</TS>
<TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
</Attributes>
</TransactionLog>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<TransactionLog>
<Attributes>
<TS>ApplyMathRule</TS>
<TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
</Attributes>
</TransactionLog>
Desired output is :
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<TransactionLog>
<Attributes>
<TS>AccumulationRule</TS>
<TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
</Attributes>
Attributes>
<TS>ApplyMathRule</TS>
<TSD>113|3600024151|3|30|Watermelon|200|20151112|113|3600024151|23|100|Jujubi|201|20151113|113|3600024152|2|40|Blackberry|202|20151114|</TSD>
</Attributes>
</TransactionLog>
For latest and updated Cloudera certification dumps in PDF format contact us at completeexamcollection@gmail.com.
ReplyDeleteRefer our blog for more details http://completeexamcollection.blogspot.in/2015/04/cloudera-hadoop-certification-dumps.html
The Mmatf Stock Overview offers real time stock price updates. All you need to do is open the application and you can view Streaming stock prices of your favorite stocks.
ReplyDelete