Wednesday, 30 April 2014

How To Create Tables In HIVE


Hive provides us data warehousing facilities on top of an existing Hadoop cluster. Along with that it provides an SQL like interface.

You can create table in two different ways.

1. Create External table 

CREATE EXTERNAL TABLE students
(id INT, name STRING, batch STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' #supply delimiter
LOCATION '/user/hdfs/students'; 
For External Tables Hive does not move the data into its warehouse directory. If the external table is dropped, then the table metadata is deleted but not the data.

2. Create Normal Table 
CREATE TABLE students
(id INT, name STRING, batch STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' #supply delimiter
LOCATION '/user/hddfs/students';
For Normal tables hive moves data into its warehouse directory. If the table is dropped, then the table metadata and the data will be deleted.

Hadoop Installation For Beginners - Pseudo Distributed Mode (Single Node Cluster)


Hadoop is an open-source software framework which is capable to store large amount of data and processing those bigdata.The underlying technology was invented by Google back in their earlier days. Hadoop was part of an open source project Nutch developed by Yahoo.Later Hadoop was spun out from Nutch Search Engine.
Hadoop is able to handle a Large amout of data.

Hadoop is comprised of 2 components
1. HDFS for storage
2. MapReduce for processing data in HDFS

Hadoop can be installed in 3 diffrent ways

1. Standalone Mode

Hadoop is configured to run in a non-distributed mode, as a single Java process. This is useful for debugging.
The following example copies the unpacked conf directory to use as input and then finds and displays every match of the given regular expression. Output is written to the given output directory.
   $ mkdir input
   $ cp conf/*.xml input
   $ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+'
   $ cat output/*

2. Pseudo Distributed Mode or Single Node Cluster

Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process with one node.

3. Multi Node Cluster

A few nodes to extremely large clusters with thousands of nodes.

Below installation explains how to install hadoop in pseudo distributed Mode.


Prerequisite

1. Java (Latest Version)

> sudo add-apt-repository ppa:webupd8team/java
> sudo apt-get update
> sudo apt-get install oracle-java7-installer

2. SSH

> apt-get install ssh
> ssh localhost
[sudo]password:
Welcome to Ubuntu 12.04 LTS (GNU/Linux 3.2.0-23-generic x86_64)
 * Documentation:  https://help.ubuntu.com/
Last login: Tue Apr 29 17:48:55 2014 from amma-hp-probook-4520s.local


Configuring Passwordless SSH

In pseudo-distributed mode, we have to start daemons, and to do that, we need to have SSH installed. Hadoop doesn’t actually distinguish between pseudo-distributed and fully distributed modes: it merely starts daemons on the set of hosts in the cluster (defined by the slaves file) by SSH-ing to each host and starting a daemon process. Pseudo-distributed mode is just a special case of fully distributed mode in which the (single) host is localhost, so we need to make sure that we can SSH to localhost and log in without having to enter a password.

If you cannot ssh to localhost without a passphrase, execute the following commands:

unmesha@unmesha-hadoop-virtual-machine:~$ ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/home/unmesha/.ssh/id_rsa): [press enter]
Enter passphrase (empty for no passphrase): [press enter]
Enter same passphrase again: [press enter]
Your identification has been saved in /home/unmesha/.ssh/id_rsa.
Your public key has been saved in /home/unmesha/.ssh/id_rsa.pub.
The key fingerprint is:
61:c5:33:9f:53:1e:4a:5f:e9:4d:19:87:55:46:d3:6b unmesha@unmesha-virtual-machine
The key's randomart image is:
+--[ RSA 2048]----+
|         ..    *%|
|         .+ . ++*|
|        o  = *.+o|
|       . .  = oE.|
|        S    ..  |
|                 |
|                 |
|                 |
|                 |
+-----------------+

unmesha@unmesha-hadoop-virtual-machine:~$ ssh-copy-id localhost
unmesha@localhost's password: 
Now try logging into the machine, with "ssh 'localhost'", and check in:

  ~/.ssh/authorized_keys

to make sure we haven't added extra keys that you weren't expecting.

Now you will be able to ssh without password

unmesha@unmesha-hadoop-virtual-machine:~$ ssh localhost
Welcome to Ubuntu 12.04 LTS (GNU/Linux 3.2.0-23-generic x86_64)

 * Documentation:  https://help.ubuntu.com/

Last login: Tue Apr 29 17:48:55 2014 from amma-hp-probook-4520s.local
unmesha@unmesha-virtual-machine:~$ 


Setting JAVA_HOME


Before running Hadoop, we need to tell where Java is located on your system. If you have the JAVA_HOME environment variable set to point to a suitable Java installation, that will be used, and you don’t have to configure anything further. Otherwise, you can set the Java installation that Hadoop uses by editing certain configuration file, and specifying the JAVA_HOME variable.

unmesha@unmesha-hadoop-virtual-machine:~$ java -version
java version "1.7.0_55"
Java(TM) SE Runtime Environment (build 1.7.0_55-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode)
Check your current location of java 
unmesha@unmesha-hadoop-virtual-machine:~$ sudo update-alternatives --config java
[sudo] password for unmesha: 
There is only one alternative in link group java: /usr/lib/jvm/java-7-oracle/jre/bin/java
Nothing to configure.

If you have only one alternative it will only display as above else this command lists all the alternatives with a * symbol to the current installed location.
Next copy the path (before /jre/bin)and set it in ~/.bashrc 
unmesha@unmesha-hadoop-virtual-machine:~$ vi ~/.bashrc
Note: Check if you are able to type into profile else 
apt-get install vim

and add then set java home to last line.

export JAVA_HOME=/usr/lib/jvm/java-7-oracle

Navigate to another terminal or refresh profile

unmesha@unmesha-hadoop-virtual-machine:~$ source ~/.bashrc 
Check if you are able to echo  JAVA_HOME
unmesha@unmesha-hadoop-virtual-machine:~$ echo $JAVA_HOME
/usr/lib/jvm/java-7-oracle

Hadoop Installation


Download a latest version of hadoop from Apache Mirrors.

Download the latest stable version.
Downloading: hadoop-2.3.0.tar.gz


Untarring the file

unmesha@unmesha-hadoop-virtual-machine:~$ tar xvfz hadoop-2.3.0.tar.gz 
unmesha@unmesha-hadoop-virtual-machine:~$ cd hadoop-2.3.0/
unmesha@unmesha-hadoop-virtual-machine:~/hadoop-2.3.0$ ls
bin  include  libexec      NOTICE.txt  sbin
etc  lib      LICENSE.txt  README.txt  share

Move hadoop-2.3.0 to hadoop
unmesha@unmesha-hadoop-virtual-machine:~$ sudo mv hadoop-2.3.0 /usr/local/hadoop

Set below contents into ~/.bashrc 
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
unmesha@unmesha-hadoop-virtual-machine:~$ source ~/.bashrc 

Configuration

We need to configure 5 files  

1. core-site.xml  
2. mapred-site.xml
3. hdfs-site.xml
4. hadoop-env.sh
5. yarn-site.xml

1. core-site.xml

unmesha@unmesha-hadoop-virtual-machine:~$ vi /usr/local/hadoop/etc/hadoop/core-site.xml
unmesha@unmesha-hadoop-virtual-machine:~/hadoop/hadoop-2.3.0/etc/hadoop$ vi core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
   <name>fs.default.name</name>
   <value>hdfs://localhost:9000</value>
</property>
</configuration>

2. mapred-site.xml

By default, the /usr/local/hadoop/etc/hadoop/ folder contains the /usr/local/hadoop/etc/hadoop/mapred-site.xml.template file which has to be renamed/copied with the name mapred-site.xml. This file is used to specify which framework is being used for MapReduce.

unmesha@unmesha-hadoop-virtual-machine:~/$ vi cp /usr/local/hadoop/etc/hadoop/mapred-site.xml.template /usr/local/hadoop/etc/hadoop/mapred-site.xml
unmesha@unmesha-hadoop-virtual-machine:~$ vi /usr/local/hadoop/etc/hadoop/mapred-site.xml
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
   <name>mapreduce.framework.name</name>
   <value>yarn</value>
</property>
 </configuration>

Create two folders for namenode and datanode (Dont use sudo for creating mkdir)


mkdir -p /usr/local/hadoop_store/hdfs/namenode
mkdir -p /usr/local/hadoop_store/hdfs/datanode

3. hdfs-site.xml

unmesha@unmesha-hadoop-virtual-machine:~/$ vi /usr/local/hadoop/etc/hadoop/hdfs-site.xml
<?xml version="1.0"?>
<?xml-stylesheet href="configuration.xsl"?>
 <configuration>
 <property>
   <name>dfs.replication</name>
   <value>1</value>
 </property>
 <property>
   <name>dfs.namenode.name.dir</name>
   <value>file:/usr/local/hadoop_store/hdfs/namenode</value>
 </property>
 <property>
   <name>dfs.datanode.data.dir</name>
   <value>file:/usr/local/hadoop_store/hdfs/datanode</value>
 </property>
</configuration>

4. hadoop-env.sh

unmesha@unmesha-hadoop-virtual-machine:~$ vi /usr/local/hadoop/etc/hadoop/hadoop-env.sh

# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-i386

5. yarn-site.xml


unmesha@unmesha-hadoop-virtual-machine:~/$ vi /usr/local/hadoop/etc/hadoop/yarn-site.xml
<?xml version="1.0"?>
<?xml-stylesheet href="configuration.xsl"?>
 <configuration>
 <property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
 </property>
 <property>
   <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
   <value>org.apache.hadoop.mapred.ShuffleHandler</value>
 </property>
</configuration>

Now Format the namenode (Only done once)

unmesha@unmesha-hadoop-virtual-machine:~/$hdfs namenode -format

You will see some thing like this

......14/04/30 12:37:42 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
14/04/30 12:37:42 INFO util.ExitUtil: Exiting with status 0
14/04/30 12:37:42 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at unmesha-virtual-machine/127.0.1.1
************************************************************/

Now we will start all the demoons

unmesha@unmesha-hadoop-virtual-machine:~/$start-dfs.sh
unmesha@unmesha-hadoop-virtual-machine:~/$start-yarn.sh
To check what all daemons are running type "jps"
unmesha@unmesha-hadoop-virtual-machine:~/$jps
2243 NodeManager
2314 ResourceManager
1923 DataNode
2895 SecondaryNameNode
1234 Jps
1788 NameNode

In Hadoop there are 2 locations

1. User's HDFS

 (Optional)

 Setting hadoop users location. (Try with sudo -u hdfs command or hadoop fs command)

sudo -u hdfs hadoop fs -mkdir /user/<your username> 
sudo -u hdfs hadoop fs -chown <user> /user/<your username> 
  OR
hadoop fs -mkdir /user/<your username> 
hdfs hadoop fs -chown <user> /user/<your username> 

2. Root HDFS

hadoop fs -ls /

You can put your files in any location

To put your files in user hdfs just leave last parameter as empty(automatically points to users hdfs )



unmesha@unmesha-hadoop-virtual-machine:~/$hadoop fs -put mydata 
unmesha@unmesha-hadoop-virtual-machine:~/$hadoop fs -ls 

Lets run an example

For any programming language there will be a "Hello World" program.

Similary hadoop is having a "Hello World" programs known as "Word Count"Hadoop jobs run basically with 2 directories


1. one directory or files as input

2. and another non-existing directory as output path.Hadoop automatically creates the output path


So for WordCount program input will be a text file and output folder contains the wordcount for that file.You can copy the text file from Here or else copy some paragraph data from Google or any other place and name that file and place it in a folder. 

unmesha@unmesha-hadoop-virtual-machine:~/$cd
unmesha@unmesha-hadoop-virtual-machine:~/$mkdir mydata
unmesha@unmesha-hadoop-virtual-machine:~/$cd mydata
unmesha@unmesha-hadoop-virtual-machine:~/mydata$vi input
# Paste into this input file

Now your input folder is ready.Any hadoop job to run we should place our inputs to HDFS.
MapReduce programs can only run inputs only from HDFS.
So now we need to put mydata to HDFS.

unmesha@unmesha-hadoop-virtual-machine:~/$cd
unmesha@unmesha-hadoop-virtual-machine:~/$hadoop fs -put mydata /

Hadoop shell commands are executed using "hadoop" .

What above command does is : It puts mydata to hdfs
hadoop fs -put local/path hdfs/path

Note: If you have any issues in copying directory to hdfs.It is because of "No permission". Copy or move your mydata directory to /tmp

unmesha@unmesha-hadoop-virtual-machine:~/$mv mydata /tmp

Then try to copy with new input location as the source.

Now We will run wordcount program from hadoop-mapreduce-examples-2.3.0.jar which contains several examples.

Any mapreduce programs that we write are packed as a jar and then we submit the job to cluster.

Basic command to run MapReduce Jobs

hadoop jar jarname.jar MainClass indir outdir

Run wordcount example

unmesha@unmesha-hadoop-virtual-machine:~/$cd
unmesha@unmesha-hadoop-virtual-machine:~/$hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.3.0.jar wordcount /mydata /output1

After finishing the job traverse to output1 to view the result

unmesha@unmesha-hadoop-virtual-machine:~/$hadoop fs -ls -R /output1  
unmesha@unmesha-hadoop-virtual-machine:~/$hadoop fs -cat /output1/part-r-00000    # This shows the wordcount result

For any job the result will be stored in part files.

Hadoop Web Interfaces

Hadoop comes with several web interfaces which are available by default.
http://localhost:50070/ – web UI of the NameNode daemon
http://localhost:50030/ – web UI of the JobTracker daemon
http://localhost:50060/ – web UI of the TaskTracker daemon

You can also track the running Job using a url which is displayed in console while running the job.


14/04/30 12:57:11 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1398885280814_0002
14/04/30 12:57:19 INFO impl.YarnClientImpl: Submitted application application_1398885280814_0002
14/04/30 12:57:21 INFO mapreduce.Job: The url to track the job: http://ubuntu:8088/proxy/application_1398885280814_0002/
14/04/30 12:57:21 INFO mapreduce.Job: Running job: job_1398885280814_0002

Killing a Job

unmesha@unmesha-hadoop-virtual-machine:~/$cd
unmesha@unmesha-hadoop-virtual-machine:~/$hadoop job -list
job_1398885280814_0002
unmesha@unmesha-hadoop-virtual-machine:~/$hadoop job -kill job_1398885280814_0002
14/04/30 14:02:54 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/04/30 14:03:06 INFO impl.YarnClientImpl: Killed application application_1398885280814_0002
Killed job job_1398885280814_0002

To stop the single node cluster


unmesha@unmesha-hadoop-virtual-machine:~/$stop-all.sh

Hadoop can be installed using cloudera also with less steps in an easy way .The difference is Cloudera packed Apache Hadoop and some ecosystem projects into one package.And they have set all the configuration to localhost and we need not want to set the configuration files.

Installation using Cloudera Package.


Happy Hadooping ...


Sunday, 27 April 2014

Can we change the default key-value output seperator in Hadoop MapReduce


Yes, We can change it using "mapred.textoutputformat.separator" property in Driver class, if we are using TextOutputFormat as Output Format.Default seperator is "\t".


Change to ","
Configuration conf = getConf();
conf.set("mapred.textoutputformat.separator", ","); 

Change to ";"
Configuration conf = getConf();
conf.set("mapred.textoutputformat.separator", ";"); 

Change to ":"
Configuration conf = getConf();
conf.set("mapred.textoutputformat.separator", ":"); 

Happy Hadooping ...

Wednesday, 23 April 2014

Hadoop WordCount Example In Detail


For any programming language there is a "Hello World" program. Like wise for Hadoop also there is a "Hello World" program - WordCount Example.


/*
 * import Statements
 */
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 *
 */
public class WordCount {
 /*
  * Map class extends Mapper Base Class 
  * Four arguments 
  * key/Value input and
  * key Value Output Types 
  * Key Input: LongWritable (Line offset of input file) 
  * Value Input: Text (Each line in a file)
  * 
  * Key Output : Text (Each word in a file) 
  * Value Output : IntWritable (1)
  * 
  * Input Line: qwerty the rose the 
  * Input Key/Value : 234 qwerty the rose the
  * Output key/Value : qwerty 1 the 1 rose 1 the 1
  */
 public static class Map extends
   Mapper<LongWritable, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   /*
    * Getting each value(each line of a file) in line variable. Using
    * stringTokenizer splits each word in a line and emit each word as
    * key and 1 as value
    */
   String line = value.toString();
   // line = "qwerty the rose the"
   StringTokenizer tokenizer = new StringTokenizer(line);
   while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    context.write(word, one);
    /*
     * qwerty 1 
     * the 1 
     * rose 1 
     * the 1
     */
   }
  }
 }

 /*
  * In between Shuffle and sort takes place. After each map() there will be a
  * shuffle and sort phase. Shuffle aggregates all the unique keys and
  * convert those values into a single list 
  * eg: if one map() emits 
  * qwerty 1
  * the 1 
  * rose 1 
  * the 1 
  * 
  * Then after shuffle output will be 
  * qwerty,[1] 
  * the,[1,1]
  * rose,[1]
  * 
  * and sorting is done after the completion of each Map() So the input to
  * Reducer will be unique key with list of values 
  * qwerty,[1] 
  * rose,[1]
  * the,[1,1]
  */
 public static class Reduce extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  /*
   * Reducer need to extend the Reducer Base class 
   * Four arguments
   * key/Value input and key Value Output Types 
   * Key Input: Text (unique key from mapper)
   * Value Input: IntWritable (List of values)
   *  
   * Key Output: Text (each unique word) 
   * Value Input: IntWritable (count of each word)
   * 
   * Input key/Value : 
   * qwerty,[1] 
   * rose,[1] 
   * the,[1,1] 
   * 
   * Output Key/value :
   * qwerty,1 
   * rose,1 
   * the,2
   */
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   /*
    * Text key :unique word and Iterable<IntWritable> values will be
    * list of values the,[1,1] key the Iterable Value [1,1]
    */
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   context.write(key, new IntWritable(sum));
   /*
    * qwerty,1 
    * rose,1 
    * the,2
    */
  }
 }

 /*
  * main or driver class which contains all the configuration to set up a
  * mapreduce job
  */
 public static void main(String[] args) throws Exception {

  /*
   * creating a configuration object
   */
  Configuration conf = new Configuration();
  Job job = new Job(conf, "wordcount");
  job.setJarByClass(WordCount.class);

  /*
   * what are the values of key/value output type from mapper
   */
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  /*
   * what are the values of key/value output type from Reducer
   */
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  /*
   * specify Mapper class and Reducer class
   */
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  /*
   * Setting input format default is TextInputFormat each line terminated
   * with '\n'
   */
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * Setting Input Directory and output Directory Output directory should
   * be a non existing one
   */
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  /*
   * waits for the completion of the job
   */
  job.waitForCompletion(true);
 }

}

Happy Hadooping . . .

Tuesday, 22 April 2014

Chaining Jobs in Hadoop MapReduce


There are cases where we need to write more than one MapReduce Job.
Map1--Reduce1--Map2--Reduce2
How do you manage the jobs so they are executed in order? There are several approaches, Here is an approach to easily chain jobs together by writing multiple driver methods, one for each job:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author Unmesha SreeVeni U.B
 * 
 */
public class ChainJobs extends Configured implements Tool {

 private static final String OUTPUT_PATH = "intermediate_output";

 @Override
 public int run(String[] args) throws Exception {
  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

  /*
   * Job 2
   */
  
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;
 }

 /**
  * Method Name: main Return type: none Purpose:Read the arguments from
  * command line and run the Job till completion
  * 
  */
 public static void main(String[] args) throws Exception {
  // TODO Auto-generated method stub
  if (args.length != 2) {
   System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
   System.exit(0);
  }
  ToolRunner.run(new Configuration(), new ChainJobs(), args);
 }
}

The above code has 2 jobs named job1 and job2
private static final String OUTPUT_PATH = "intermediate_output";
String "OUTPUT_PATH" is used to write the output for first job.
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
So in first job our input will be args[0] and output will be new Path(OUTPUT_PATH).

First Job Configuration


  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs1.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

Once the first job has executed successfully  "OUTPUT_PATH" is served as the input to second job and the output of job2 is written to args[1].
TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
TextOutputFormat.setOutputPath(job2, new Path(args[1]));

Second Job Configuration

  /*
   * Job 2
   */
 
  Job job2 = new Job(conf, "Job 2");
  job2.setJarByClass(ChainJobs1.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;

Happy Hadooping . . .

Aggregations In Hadoop MapReduce


Aggregation functions are sum,min,max,count etc.These aggregations are really useful in statictics and can be done in Hadoop MapReduce also.If aggregation functions are to be done on a large data we can do it in MapReduce also.
Below is the code for finding Min() and Max() for each columns of a csv file in MapReduce.

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 */
public class ColumnAggregator {

 public static class ColMapper extends
   Mapper<Object, Text, Text, DoubleWritable> {
  /*
   * Emits column Id as key and entire column elements as Values
   */
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String[] cols = value.toString().split(",");
   for (int i = 0; i < cols.length; i++) { 
    context.write(new Text(String.valueOf(i + 1)),new DoubleWritable(Double.parseDouble(cols[i])));
   }

  }
 }

 public static class ColReducer extends
   Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  /*
   * Reducer finds min and max of each column
   */

  public void reduce(Text key, Iterable<DoubleWritable> values,
    Context context) throws IOException, InterruptedException {
   double min = Integer.MAX_VALUE, max = 0;
   Iterator<DoubleWritable> iterator = values.iterator(); //Iterating 
   while (iterator.hasNext()) {
    double value = iterator.next().get();
    if (value < min) { //Finding min value
     min = value;
    }
    if (value > max) { //Finding max value
     max = value;
    }
   }
   context.write(new Text(key), new DoubleWritable(min));
   context.write(new Text(key), new DoubleWritable(max));
  }
 }
 public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  Job job = new Job(conf, "Min and Max");
  job.setJarByClass(ColumnAggregator.class);
  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(new Path(args[1]))) {
   fs.delete(new Path(args[1]), true);
  }
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(DoubleWritable.class);

  job.setMapperClass(ColMapper.class);
  job.setReducerClass(ColReducer.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Explanation


For any MapReduce program there are 3 classes

1.Driver Class for Configuration

2.Mapper

3.Reducer


Mapper:


Map receives Offset of the file and each line as key value pair.Map generates an id for each column and emit the id and entire column to Reducer.


Reducer:


Reducer recieves each column Id and List of values as key value pair and finds min and max for each key and emit column id as key and min and max as values

Here ,If only 1 reducer is used ,then we will be stressing the Reducer for finding min and max.There is a better idea that can be done in Map()
We have setup() and cleanup() functions.

 

setup() executes before all map() and 

cleanup() executes after all map().



It is better to add min and max finding code in cleanup()

Map()
{
         /*No emit*/  
}
cleanup()
{
         emit(colId(,min,max))
}

Again in reducer we need to find Min and Max
reducer()
{
        emit(colId,(min,max))
}

Now the Reducer need to calculate only some combinations of min amd max.This way we can reduce the stress given to reducer.

Happy Hadooping.

Monday, 21 April 2014

Code For Deleting Output Folder If Exist In Hadoop MapReduce Jobs




Mostly Hadoop MapReduce Jobs operates with two arguments.
Input directory and Output directory.

Each time when we run our MapReduce job we need to give  non-existing folder as our output path. So while we are doing a Trail and Error method in our MR jobs. It is good if it automatically deletes  the output folder if exists.

Here is the code for that:
/*Provides access to configuration parameters*/
Configuration conf = new Configuration();
/*Creating Filesystem object with the configuration*/
FileSystem fs = FileSystem.get(conf);
/*Check if output path (args[1])exist or not*/
if(fs.exists(new Path(args[1]))){
   /*If exist delete the output path*/
   fs.delete(new Path(args[1]),true);
}

[SOLVED] java.lang.Exception: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text




It is quite natural seeing ClassCastException in Hadoop MR Jobs like these for Hadoop beginers.

java.lang.Exception: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
 at java.lang.Class.asSubclass(Class.java:3037)
 at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:819)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:836)
 at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:376)
 at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:85)
 at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:584)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:656)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
 at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
 at java.util.concurrent.FutureTask.run(FutureTask.java:166)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)

While checking MR code we will not find any errors,but causes a ClassCastException

When Eclipse detects a problem in your code, it will display an error or warning icon along the left edge - known as gutter. Hover over the icon, a description of the problem pops up.  And imports the wrong statement and runs the job.





It is because we are importing the wrong import statement by mistake.





 Correct import statement is 


Now MR job executes with no exception

Happy Hadooping 





How to get IP address using a specific weblog

Example input :

  96.7.4.14 - - [24/Apr/2011:04:20:11 -0400] "GET /cat.jpg HTTP/1.1" 200 12433

String[] fields = value.toString().split(" ");
if (fields.length > 0) {
 String ip = fields[0];
}

 String variable "ip" gives IP Address

Tuesday, 8 April 2014

[SOLVED] org.apache.hadoop.hdfs.server.namenode.SafeModeException:Name node is in safe mode - How to leave


Namenode daemon in Hadoop Framework enters into safe mode in unusual situations.

For example when disk is full, also in the start-up phase. You can see something like this.



org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot create directory /user/root/t. Name node is in safe mode


So Hadoop needs to leave safemode.Running below command leaves safemode.


bin/hadoop dfsadmin -safemode leave

After doing the above command, Run hadoop fsck so that any inconsistencies crept in the hdfs might be sorted out.

Use  hdfs  command instead of  hadoop command for newer distributions:

hdfs dfsadmin -safemode leave

hadoop dfsadmin has been deprecated, all hdfs related tasks are being moved to a separate command hdfs.




How To CONCAT Multiple Expressions In Apache Pig


Apache Pig is a platform for analyzing large data sets.Instead of writing a complicated MapReduce Job we can simply write Apache Pig Scripts.


Below is an example code to CONCAT multiple values in Apache Pig.CONCAT is able to only concat 2 values.


Using the below code we can concat multiple values .


CONCAT(CONCAT(), );
eg: CONCAT((chararray)$0,CONCAT('Pig',(chararray)$6));

Custom Parameters To Pig Script


There may be scenerios where we need to make our custom pig scripts, which can take any arguments.

Below is an sample code for a Custom Pig Script.

Sample Pig Script

The "customparam.pig" loads an input with custom argument and generates a single field from the input bag to another bag and stores the new bag to HDFS.


Here the input,delimiter for input file,output and filed to seperate are given as custom arguments to Pig Scripts.
--customparam.pig
--load hdfs/local fs data
original = load '$input' using PigStorage('$delimiter');
--filter a specific field value into another bag 
filtered = foreach original generate $split; 
--storing data into hdfs/local fs
store filtered into '$output'; 

Pig Scripts can be run as Local or in MapReduce Mode.


Local Mode

pig -x local -f customparam.pig -param input=Pig.csv -param output=OUT/pig -param delimiter="," -param split='$1'

This is the sample "Pig.csv" file which is the custom input used in command line.The custom delimiter is ",".

Pig1,23.5,Matched
Pig2,6.88,Not Matched
Pig3,6.1,Not Matched

And seperating 2 nd column from the original bag to a new bag.Any field in Pig starts with $0,$1,$2,....So if we need to generate 2 nd column the split param should be "$1".


After executing the above command. The part file content will be

23.5
6.88
6.1

If the command is run in MapReduce mode the part file get stored in HDFS.