Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Thursday, 10 December 2015

Faster way to count number of lines in a file/dir using Map Reduce Framework


In this site you can see one way to count number of lines in a file.
They are emitting count as one for each record in each map. So if 1 map holds 10,000 lines 10,000 values will be passed to reducer.If more than one mapper that many read-writes will happen.
Lets reduce the intermediate writes.

Below is an optimized way to count no of lines in a file/dir
Changes are done in
1. Mapper
Instead of emitting 'one' for each record, we increment line count in map and emit them in cleanup() phase.
public class LineCntMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {

 Text keyEmit = new Text("Total Lines");
 IntWritable valEmit = new IntWritable();
 int partialSum = 0;

 public void map(LongWritable key, Text value, Context context) {
  partialSum++;
 }

 public void cleanup(Context context) {
  valEmit.set(partialSum);
  try {
   context.write(keyEmit, valEmit);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   System.exit(0);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   System.exit(0);
  }
 }
}
So if we have 5 map tasks we will only emit 5 intermediate key-value pair.

2. Driver
In Driver we will include a combiner also
job.setMapperClass(LineCntMapper.class);
job.setCombinerClass(LineCntReducer.class);
job.setReducerClass(LineCntReducer.class);
Combiner doesnt do nothing more than Reducer. we can use reducer as combiner itself.
Reducer doesnt need any change.

If you run this code you will get the results faster than the previous mentioned code in this site .

Working code is here

Happy Hadooping........

Sunday, 7 December 2014

Joining Two Files Using MultipleInput In Hadoop MapReduce - MapSide Join

There are cases where we need to get 2 files as input and join them based on id or something like that.
Two different large data can be joined in map reduce programming also. Joins in Map phase refers as Map side join, while join at reduce side called as reduce side join.  
MapSide can be achieved using MultipleInputFormat in Hadoop.

Say I have 2 files ,One file with EmployeeID,Name,Designation and another file with EmployeeID,Salary,Department.

File1.txt
1 Anne,Admin
2 Gokul,Admin
3 Janet,Sales
4 Hari,Admin

AND

File2.txt
1 50000,A
2 50000,B
3 60000,A
4 50000,C

We will try to join these files into one based on EmployeeID
The result we aim at is 

1 Anne,Admin,50000,A
2 Gokul,Admin,50000,B
3 Janet,Sales,60000,A
4 Hari,Admin,50000,C

Here in both file File1.txt,File2.txt we can see that we need to join the records based on id.  So the employeeId's are common.
We will write 2 map jobs to process these files.

Processing File1.txt
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
 String line=value.toString();
 String[] words=line.split("\t");
 keyEmit.set(words[0]);
 valEmit.set(words[1]);
 context.write(keyEmit, valEmit);
}

The above map job process File1.txt
String[] words=line.split("\t");
splits each line with \t space so words[0] will be the employeeId which we pass it as key and the rest as value.

eg: 1 Anne,Admin
words[0] = 1
words[1] = Anne,Admin

Or else you can also use KeyValueTextInputFormat.class as InputFormat. This class gives key as employeeId and the rest as value.
You dont need to split it.

Processing File2.txt
public void map(LongWritable k, Text v, Context context) throws IOException, InterruptedException
{
 String line=v.toString();
 String[] words=line.split(" ");
 keyEmit.set(words[0]);
 valEmit.set(words[1]);
 context.write(keyEmit, valEmit);
}

The above map job process File2.txt

eg: 1 50000,A
words[0] = 1
words[1] = 50000,A

If the files are of same delimiter and ID comes first you can resuse the same map job

Lets write a commomn Reducer task to join the data using key.
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
{
 int i =0;
 for(Text value:values)
 {
  if(i == 0){
   merge = value.toString()+",";
  }
  else{
   merge += value.toString();
  }
  i++;
 }
 valEmit.set(merge);
 context.write(key, valEmit);
}

Here we will be caching 1 data from a mapper and appends it to string "merge".
And emit employeeId as key and merge as value.

Now we need to furnish our Driver class to take 2 inputs and use MultipleInputFormat as InputFormat


public int run(String[] args) throws Exception {
 Configuration c=new Configuration();
 String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
 Path p1=new Path(files[0]);
 Path p2=new Path(files[1]);
 Path p3=new Path(files[2]);
 FileSystem fs = FileSystem.get(c);
 if(fs.exists(p3)){
  fs.delete(p3, true);
  }
 Job job = new Job(c,"Multiple Job");
 job.setJarByClass(MultipleFiles.class);
 MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
 MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
 job.setReducerClass(MultipleReducer.class);
 .
 .
}

MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
p1,p2 are the Path variable holding 2 input files.
You can find the code in Github

There is one more case where we can make our output in a sequential manner.
Say if we need to get the output as below
1 Anne,Admin,50000,A
2 Gokul,Admin,50000,B
3 Janet,Sales,60000,A
4 Hari,Admin,50000,C
Inorder to achieve the same we can make use of TextPair Writable concepts in Hadoop.
You can find the working code in github . Thanks to one of  my blog reader Ravi Kumar who sorted out the sequence in order.

Sunday, 16 November 2014

Update Statement In Hive For Small Tables


Let's see how to update small Hive tables.


1. Create a  table and load data (Assuming the data is placed in HDFS)

You can also refer Previous Post for creating hive tables.


CREATE EXTERNAL TABLEe Non_Parti(EmployeeID Int,FirstName String,Designation String,Salary Int,Department String) ROW FORMAT DELIMITED FIELDS TERMINATED BY  "," LOCATION '/user/hdfs/Hive'; 


hive> select * from Non_Parti;
OK
1 Anne Admin 50000 A
2 Gokul Admin 50000 B
3 Janet Sales 60000 A
4 Hari Admin 50000 C
5 Sanker Admin 50000 C
6 Margaret Tech 12000 A
7 Nirmal Tech 12000 B
8 jinju Engineer 45000 B
9 Nancy Admin 50000 A
10 Andrew Manager 40000 A
11 Arun Manager 40000 B
12 Harish Sales 60000 B
13 Robert Manager 40000 A
14 Laura Engineer 45000 A
15 Anju Ceo 100000 B
16 Aarathi Manager 40000 B
17 Parvathy Engineer 45000 B
18 Gopika Admin 50000 B
19 Steven Engineer 45000 A
20 Michael Ceo 100000 A
Time taken: 0.233 seconds, Fetched: 20 row(s)


2. Updating Department of employeeid 19 's to C.


INSERT OVERWRITE TABLE Non_Parti SELECT employeeid,firstname,designation,salary, CASE WHEN employeeid=19 THEN 'C' ELSE department END AS department FROM Non_Parti;


hive> select * from Non_Parti;
OK
1 Anne Admin 50000 A
2 Gokul Admin 50000 B
3 Janet Sales 60000 A
4 Hari Admin 50000 C
5 Sanker Admin 50000 C
6 Margaret Tech 12000 A
7 Nirmal Tech 12000 B
8 jinju Engineer 45000 B
9 Nancy Admin 50000 A
10 Andrew Manager 40000 A
11 Arun Manager 40000 B
12 Harish Sales 60000 B
13 Robert Manager 40000 A
14 Laura Engineer 45000 A
15 Anju Ceo 100000 B
16 Aarathi Manager 40000 B
17 Parvathy Engineer 45000 B
18 Gopika Admin 50000 B
19 Steven Engineer 45000 C
20 Michael Ceo 100000 A
Time taken: 0.184 seconds, Fetched: 20 row(s)

Your Hive table is now updated. This can be done for small tables only.If you need to update large tables we need to partition Hive tables.

*In newer version of Hive update will be included.

Monday, 3 November 2014

K-Means Clustering in Mahout


Example shows Cloudera mahout (Hadoop 2.0.0-cdh4.5.0 with mahout-0.7)


1. Download the input data set


unmesha@client:~$ wget http://archive.ics.uci.edu/ml/databases/synthetic_control/synthetic_control.data

2. Place the data into HDFS under "testdata"
unmesha@client:~$ hadoop fs -mkdir testdata
unmesha@client:~$ echo $MAHOUT_HOME
/usr/lib/mahout/bin
unmesha@client:~$ $HADOOP_HOME/bin/hadoop fs -put /PATH/TO/synthetic_control.data testdata

*HDFS input directory name should be “testdata”



Run Kmeans Clustering

unmesha@client:~$ $MAHOUT_HOME/mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job

The result get stored in HDFS with "output" foldername

unmesha@client:~$ hadoop fs -ls output
Found 14 items
-rwxr-xr-x   1 unmesha unmesha        194 2014-11-04 09:06 output/_policy
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusteredPoints
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-0
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-1
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-10-final
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-2
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-3
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-4
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-5
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-6
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-7
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-8
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/clusters-9
drwxrwxr-x   - unmesha unmesha       4096 2014-11-04 09:06 output/data

The clustering output is in SequenceFile format which is not human readable. Mahout has a utility known as clusterdump which converts into human readable format.


Copy the cluster output from HDFS onto your local file system


unmesha@client:~$ hadoop fs -mkdir kmeansoutput
unmesha@client:~$ hadoop fs -get output kmeansoutput

unmesha@client:~$ mahout clusterdump --input output/clusters-10-final --pointsDir output/clusteredPoints --output kmeansoutput/clusteranalyze.txt

You can view the results now in kmeansoutput/clusteranalyze.txt



Sunday, 2 November 2014

How To Install Apache Mahout on Ubuntu


Prerequisites:

  1.  Hadoop Cluster
  2.  Maven


STEP 1: Download mahout latest source code from

http://www.apache.org/dyn/closer.cgi/lucene/mahout/

Make sure you download .src zipped file.


STEP 2: Unzip the file to a named folder “mahout”

unzip -a mahout-distribution-x.x-src.zip

STEP 3: Move mahout to /usr/local

mv mahout /usr/local

STEP 4: Build Mahout

unmesha@client:~$ cd /usr/local/mahout/mahout-distribution-0.9
unmesha@client:/usr/local/mahout/mahout-distribution-0.9$ ls
bin         core          examples     LICENSE.txt  math-scala  pom.xml     src buildtools  distribution  integration  math         NOTICE.txt  README.txt  target
unmesha@client:/usr/local/mahout/mahout-distribution-0.9$mvn install

Wait untill mahout is build. It would perform some tests also.It is recommended to complete the test for the first time.Later you can skip the test using

mvn install -Dmaven.test.skip=true

Once the tests are done and the mahout is built , we get a success message.


Congratz Apache Mahout is installed...


If you are using Cloudera(CDH) package , you can install Mahout in just 1 step.
apt-get install mahout

You can use mahout commands in /usr/bin and if you want to run mahout in hadoop cluster go to /usr/lib and reference mahout-cdhx-core-job.jar and full class path.



Friday, 24 October 2014

How to load a file in DistributedCache in Hadoop MapReduce



We can load an extra file using Distributed Cache.To do that we need to configure the Distributed Cache with needed file in Driver Class


Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path cachefile = new Path("path/to/file");
FileStatus[] list = fs.globStatus(cachefile);
for (FileStatus status : list) {
 DistributedCache.addCacheFile(status.getPath().toUri(), conf);
}
And in Reducers setup() or Mappers Setup() we will be able to read this file.
public void setup(Context context) throws IOException{
 Configuration conf = context.getConfiguration();
 FileSystem fs = FileSystem.get(conf);
 URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
 Path getPath = new Path(cacheFiles[0].getPath());  
 BufferedReader bf = new BufferedReader(new InputStreamReader(fs.open(getPath)));
 String setupData = null;
 while ((setupData = bf.readLine()) != null) {
   System.out.println("Setup Line in reducer "+setupData);
 }
}
You can give 0,1,... if you supplied more than 1 cache file
Path getPath = new Path(cacheFiles[1].getPath());  

Happy Hadooping ....

Monday, 29 September 2014

Comments On CCD-410 Sample Dumps


What do you think of these three questions mentioned in site CCD-410 Practice Exam Questions Demo 100% Pass-Guaranteed or Your Money Back!!!


QUESTION: 3

What happens in a MapReduce job when you set the number of reducers to one?

A. A single reducer gathers and processes all the output from all the mappers. The output is written in as many separate files as there are mappers.
B. A single reducer gathers and processes all the output from all the mappers. The output is written to a single file in HDFS.
C. Setting the number of reducers to one creates a processing bottleneck, and since the number of reducers as specified by the programmer is used as a reference value only, the MapReduceruntime provides a default setting for the number of reducers.
D. Setting the number of reducers to one is invalid, and an exception is thrown.

Answer:A

QUESTION: 4

In the standard word count MapReduce algorithm, why might using a combiner reduce theoverall Job running time?

A. Because combiners perform local aggregation of word counts, thereby allowing the mappers to process input data faster.
B. Because combinersperform local aggregation of word counts, thereby reducing the number of mappers that need to run.
C. Because combiners perform local aggregation of word counts, and then transfer that data toreducers without writing the intermediate data to disk.
D. Because combiners perform local aggregation of word counts, thereby reducing the number of key-value pairs that need to be snuff let across the network to the reducers.

Answer:A

QUESTION: 5

You have user profile records in your OLTP database,that you want to join with weblogs you have already ingested into HDFS.How will you obtain these user records?

A. HDFS commands
B. Pig load
C. Sqoop import
D. Hive

Answer :B


Correct Answers

QUESTION 3: Answer B
QUESTION 4: Answer D
QUESTION 5: Answer C

See reviews on correct answer

Monday, 8 September 2014

How To Set Counters In Hadoop MapReduce

Counters are a useful channel for gathering statistics about the job: for quality control or for application level-statistics.Lets see an example where Counters count the no of keys processed in reducer.


3 key points to set

1. Define counter in Driver class

public class CounterDriver extends Configured implements Tool{
 long c = 0;
 static enum UpdateCount{
  CNT
 }
 public static void main(String[] args) throws Exception{
     Configuration conf = new Configuration();
     int res = ToolRunner.run(conf, new CounterDriver(), args);
     System.exit(res);
  }
 public int run(String[] args) throws Exception {

2. Increment or set counter in Reducer

public class CntReducer extends Reducer<IntWritable, Text, IntWritable, Text>{
 public void reduce(IntWritable key,Iterable<Text> values,Context context)  {
      //do something
      context.getCounter(UpdateCount.CNT).increment(1);
 }
}

3. Get counter in Driver class 

public class CounterDriver extends Configured implements Tool{
 long c = 0;
 static enum UpdateCount{
  CNT
 }
 public static void main(String[] args) throws Exception{
     Configuration conf = new Configuration();
     int res = ToolRunner.run(conf, new CounterDriver(), args);
     System.exit(res);
  }
 public int run(String[] args) throws Exception {
 .
 .
 .
 job.setInputFormatClass(TextInputFormat.class);
 job.setOutputFormatClass(TextOutputFormat.class);
 FileInputFormat.setInputPaths(job,in );
 FileOutputFormat.setOutputPath(job, out);
 job.waitForCompletion(true);
 c = job.getCounters().findCounter(UpdateCount.CNT).getValue();
 }
}

Full code :  GitHub

You will be able to see the counters in console also.






Saturday, 23 August 2014

Calculating Mean in Hadoop MapReduce


Given a csv files we will find mean of each column (Optimized approach)


Mapper

 Takes each input line and calculate the sum and stores the no of lines it sumed.Then sum get stored in a hash map with key as column Id.cleanup emits the sum and total line count inorder to take the overall mean.As we know each map only gets a block of input data.So while summing up we need to know how many elements summed up.


//Calculating sum
if (sumVal.isEmpty()) {
 //if sumval is empty add elements to sumval
 sumVal.putAll(mapLine);
 } else {
//calculating sum
 double sum = 0;
 for (Integer colId : mapLine.keySet()) {
  double val1 = mapLine.get(colId);
  double val2 = sumVal.get(colId);
 /*
  * calculating sum
 */
 sum = val1 + val2;
 sumVal.put(colId, sum);
 }
}


Reducer

 Sums of the values for each key.

Reducer calculates 2 sums.
  1. Sums the values for each key and 
  2. Sums total no.of linecount


for (TwovalueWritable value : values) {
 //Taking sum of values and total number of lines 
 sum += value.getSum();
 total += value.getTotalCnt();
 }
 //sum contains total sum of all elements in each column
 //total contains total no of elements in each column
 mean = sum / total;
 valEmit.set(mean);
 context.write(key, valEmit);


This approach helps in avoiding a large no of communication with reducer.Reducer needs only to sum up few values from mapper.
Say we have only 3 mappers and 4 columns in input set.Reducer only want to wait for 4 values from each mapper(no.of columns also considered)


Complete code : GitHub Link

Sunday, 4 May 2014

Map-Only Jobs In Hadoop


There may be reasons where Map-Only job is needed,Where there is no Reducer to execute.Here Map does all its task with its InputSplit and no job for Reducer.This can be achieved by setting  job.setNumReduceTasks()  to Zero in Configuration.

Job job = new Job(getConf(), "Map-Only Job");
job.setJarByClass(MaponlyDriver.class);

job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
/*
 * Set no of reducers to 0
 */
job.setNumReduceTasks(0);

job.setMapperClass(Mapper.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return(success ? 0 : 1);

This sets Reducer task to 0 and turns off the Reducer.

job.setNumReduceTasks(0);

So the no. of output files will be equal to no. of mappers and output files will be named as part-m-00000.

And once Reducer task is set to Zero the result will be unsorted.

If we are not specifying this property in Configuration, an Identity Reducer will get executed in which the same value is simply emitted along with the incoming key and the output file will be part-r-00000.



Happy Hadooping ...

Saturday, 3 May 2014

Hadoop Installation Using Cloudera Package - Pseudo Distributed Mode (Single Node)

[Previous Post]

Hadoop can be installed using cloudera also with less steps in an easy way .The difference is Cloudera packed Apache Hadoop and some ecosystem projects into one package.And they have set all the configuration to localhost and we need not want to set the configuration files.

Installation using Cloudera Package.

Prerequistie

1.Java


Installation Steps

Step 1: Set Java home in /etc/profile

unmesha@unmesha-hadoop-virtual-machine:~$ java -version
java version "1.7.0_55"
Java(TM) SE Runtime Environment (build 1.7.0_55-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode)

Check your current location of java 

unmesha@unmesha-hadoop-virtual-machine:~$ sudo update-alternatives --config java
[sudo] password for unmesha: 
There is only one alternative in link group java: /usr/lib/jvm/java-7-oracle/jre/bin/java
Nothing to configure.

Set JAVA_HOME

export JAVA_HOME=/usr/lib/jvm/java-7-oracle
unmesha@unmesha-hadoop-virtual-machine:~$ source ~/.bashrc 

Step 2: Download the package for your system under "On Ubuntu and other Debian systems, do the following:" heading from here.

Step 3: Extract the package

unmesha@unmesha-hadoop-virtual-machine:~$sudo dpkg -i cdh4-repository_1.0_all.deb


Step 4: Install Hadoop

unmesha@unmesha-hadoop-virtual-machine:~$sudo apt-get update 
unmesha@unmesha-hadoop-virtual-machine:~$sudo apt-get install hadoop-0.20-conf-pseudo


Step 5: Format Namenode

unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hdfs namenode -format


Step 6: Start HDFS

unmesha@unmesha-hadoop-virtual-machine:~$for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x start ; done


Step 7: Create the /tmp Directory

unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -mkdir /tmp 
unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -chmod -R 1777 /tmp


Step 8: Create the MapReduce system directories

unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -mkdir -p /var/lib/hadoop-hdfs/cache/mapred/mapred/staging

unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -chmod 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging

unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred


Step 9: Verify the HDFS File Structure

unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -ls -R /


Step 10: Start MapReduce

unmesha@unmesha-hadoop-virtual-machine:~$for x in `cd /etc/init.d ; ls hadoop-0.20-mapreduce-*` ; do sudo service $x start ; done


Step 11: Set up user directory

unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -mkdir /user/<your username>unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -chown <user> /user/<your username> 
unmesha@unmesha-hadoop-virtual-machine:~$sudo -u hdfs hadoop fs -mkdir /user/unmesha/new


Step 12: Run grep example, you can also try out wordcount example

unmesha@unmesha-hadoop-virtual-machine:~$/usr/bin/hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar grep input output 'dfs[a-z.]+'

Step 13: You can also stop the services

unmesha@unmesha-hadoop-virtual-machine:~$for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x stop ; done

unmesha@unmesha-hadoop-virtual-machine:~$for x in `cd /etc/init.d ; ls hadoop-0.20-mapreduce-*` ; do sudo service $x stop ; done


Happy Hadooping ...


Wednesday, 30 April 2014

How To Create Tables In HIVE


Hive provides us data warehousing facilities on top of an existing Hadoop cluster. Along with that it provides an SQL like interface.

You can create table in two different ways.

1. Create External table 

CREATE EXTERNAL TABLE students
(id INT, name STRING, batch STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' #supply delimiter
LOCATION '/user/hdfs/students'; 
For External Tables Hive does not move the data into its warehouse directory. If the external table is dropped, then the table metadata is deleted but not the data.

2. Create Normal Table 
CREATE TABLE students
(id INT, name STRING, batch STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' #supply delimiter
LOCATION '/user/hddfs/students';
For Normal tables hive moves data into its warehouse directory. If the table is dropped, then the table metadata and the data will be deleted.

Sunday, 27 April 2014

Can we change the default key-value output seperator in Hadoop MapReduce


Yes, We can change it using "mapred.textoutputformat.separator" property in Driver class, if we are using TextOutputFormat as Output Format.Default seperator is "\t".


Change to ","
Configuration conf = getConf();
conf.set("mapred.textoutputformat.separator", ","); 

Change to ";"
Configuration conf = getConf();
conf.set("mapred.textoutputformat.separator", ";"); 

Change to ":"
Configuration conf = getConf();
conf.set("mapred.textoutputformat.separator", ":"); 

Happy Hadooping ...

Wednesday, 23 April 2014

Hadoop WordCount Example In Detail


For any programming language there is a "Hello World" program. Like wise for Hadoop also there is a "Hello World" program - WordCount Example.


/*
 * import Statements
 */
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 *
 */
public class WordCount {
 /*
  * Map class extends Mapper Base Class 
  * Four arguments 
  * key/Value input and
  * key Value Output Types 
  * Key Input: LongWritable (Line offset of input file) 
  * Value Input: Text (Each line in a file)
  * 
  * Key Output : Text (Each word in a file) 
  * Value Output : IntWritable (1)
  * 
  * Input Line: qwerty the rose the 
  * Input Key/Value : 234 qwerty the rose the
  * Output key/Value : qwerty 1 the 1 rose 1 the 1
  */
 public static class Map extends
   Mapper<LongWritable, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   /*
    * Getting each value(each line of a file) in line variable. Using
    * stringTokenizer splits each word in a line and emit each word as
    * key and 1 as value
    */
   String line = value.toString();
   // line = "qwerty the rose the"
   StringTokenizer tokenizer = new StringTokenizer(line);
   while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    context.write(word, one);
    /*
     * qwerty 1 
     * the 1 
     * rose 1 
     * the 1
     */
   }
  }
 }

 /*
  * In between Shuffle and sort takes place. After each map() there will be a
  * shuffle and sort phase. Shuffle aggregates all the unique keys and
  * convert those values into a single list 
  * eg: if one map() emits 
  * qwerty 1
  * the 1 
  * rose 1 
  * the 1 
  * 
  * Then after shuffle output will be 
  * qwerty,[1] 
  * the,[1,1]
  * rose,[1]
  * 
  * and sorting is done after the completion of each Map() So the input to
  * Reducer will be unique key with list of values 
  * qwerty,[1] 
  * rose,[1]
  * the,[1,1]
  */
 public static class Reduce extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  /*
   * Reducer need to extend the Reducer Base class 
   * Four arguments
   * key/Value input and key Value Output Types 
   * Key Input: Text (unique key from mapper)
   * Value Input: IntWritable (List of values)
   *  
   * Key Output: Text (each unique word) 
   * Value Input: IntWritable (count of each word)
   * 
   * Input key/Value : 
   * qwerty,[1] 
   * rose,[1] 
   * the,[1,1] 
   * 
   * Output Key/value :
   * qwerty,1 
   * rose,1 
   * the,2
   */
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   /*
    * Text key :unique word and Iterable<IntWritable> values will be
    * list of values the,[1,1] key the Iterable Value [1,1]
    */
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   context.write(key, new IntWritable(sum));
   /*
    * qwerty,1 
    * rose,1 
    * the,2
    */
  }
 }

 /*
  * main or driver class which contains all the configuration to set up a
  * mapreduce job
  */
 public static void main(String[] args) throws Exception {

  /*
   * creating a configuration object
   */
  Configuration conf = new Configuration();
  Job job = new Job(conf, "wordcount");
  job.setJarByClass(WordCount.class);

  /*
   * what are the values of key/value output type from mapper
   */
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  /*
   * what are the values of key/value output type from Reducer
   */
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  /*
   * specify Mapper class and Reducer class
   */
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  /*
   * Setting input format default is TextInputFormat each line terminated
   * with '\n'
   */
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * Setting Input Directory and output Directory Output directory should
   * be a non existing one
   */
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  /*
   * waits for the completion of the job
   */
  job.waitForCompletion(true);
 }

}

Happy Hadooping . . .

Tuesday, 22 April 2014

Chaining Jobs in Hadoop MapReduce


There are cases where we need to write more than one MapReduce Job.
Map1--Reduce1--Map2--Reduce2
How do you manage the jobs so they are executed in order? There are several approaches, Here is an approach to easily chain jobs together by writing multiple driver methods, one for each job:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author Unmesha SreeVeni U.B
 * 
 */
public class ChainJobs extends Configured implements Tool {

 private static final String OUTPUT_PATH = "intermediate_output";

 @Override
 public int run(String[] args) throws Exception {
  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

  /*
   * Job 2
   */
  
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;
 }

 /**
  * Method Name: main Return type: none Purpose:Read the arguments from
  * command line and run the Job till completion
  * 
  */
 public static void main(String[] args) throws Exception {
  // TODO Auto-generated method stub
  if (args.length != 2) {
   System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
   System.exit(0);
  }
  ToolRunner.run(new Configuration(), new ChainJobs(), args);
 }
}

The above code has 2 jobs named job1 and job2
private static final String OUTPUT_PATH = "intermediate_output";
String "OUTPUT_PATH" is used to write the output for first job.
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
So in first job our input will be args[0] and output will be new Path(OUTPUT_PATH).

First Job Configuration


  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs1.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

Once the first job has executed successfully  "OUTPUT_PATH" is served as the input to second job and the output of job2 is written to args[1].
TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
TextOutputFormat.setOutputPath(job2, new Path(args[1]));

Second Job Configuration

  /*
   * Job 2
   */
 
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs1.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;

Happy Hadooping . . .

Aggregations In Hadoop MapReduce


Aggregation functions are sum,min,max,count etc.These aggregations are really useful in statictics and can be done in Hadoop MapReduce also.If aggregation functions are to be done on a large data we can do it in MapReduce also.
Below is the code for finding Min() and Max() for each columns of a csv file in MapReduce.

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 */
public class ColumnAggregator {

 public static class ColMapper extends
   Mapper<Object, Text, Text, DoubleWritable> {
  /*
   * Emits column Id as key and entire column elements as Values
   */
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String[] cols = value.toString().split(",");
   for (int i = 0; i < cols.length; i++) { 
    context.write(new Text(String.valueOf(i + 1)),new DoubleWritable(Double.parseDouble(cols[i])));
   }

  }
 }

 public static class ColReducer extends
   Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  /*
   * Reducer finds min and max of each column
   */

  public void reduce(Text key, Iterable<DoubleWritable> values,
    Context context) throws IOException, InterruptedException {
   double min = Integer.MAX_VALUE, max = 0;
   Iterator<DoubleWritable> iterator = values.iterator(); //Iterating 
   while (iterator.hasNext()) {
    double value = iterator.next().get();
    if (value < min) { //Finding min value
     min = value;
    }
    if (value > max) { //Finding max value
     max = value;
    }
   }
   context.write(new Text(key), new DoubleWritable(min));
   context.write(new Text(key), new DoubleWritable(max));
  }
 }
 public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  Job job = new Job(conf, "Min and Max");
  job.setJarByClass(ColumnAggregator.class);
  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(new Path(args[1]))) {
   fs.delete(new Path(args[1]), true);
  }
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(DoubleWritable.class);

  job.setMapperClass(ColMapper.class);
  job.setReducerClass(ColReducer.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Explanation


For any MapReduce program there are 3 classes

1.Driver Class for Configuration

2.Mapper

3.Reducer


Mapper:


Map receives Offset of the file and each line as key value pair.Map generates an id for each column and emit the id and entire column to Reducer.


Reducer:


Reducer recieves each column Id and List of values as key value pair and finds min and max for each key and emit column id as key and min and max as values

Here ,If only 1 reducer is used ,then we will be stressing the Reducer for finding min and max.There is a better idea that can be done in Map()
We have setup() and cleanup() functions.

 

setup() executes before all map() and 

cleanup() executes after all map().



It is better to add min and max finding code in cleanup()

Map()
{
         /*No emit*/  
}
cleanup()
{
         emit(colId(,min,max))
}

Again in reducer we need to find Min and Max
reducer()
{
        emit(colId,(min,max))
}

Now the Reducer need to calculate only some combinations of min amd max.This way we can reduce the stress given to reducer.

Happy Hadooping.