Thursday 11 December 2014

Computing Median In Hive

In statistics and probability theory, the median is the numerical value separating the higher half of a data sample, a population, or a probability distribution, from the lower half.

The median is the central point of a data set.

Consider the following data points: 1,4,5,6,7
The Median is "5".

Lets see how we will find median in Hive.

Consider a "test" table.
-------------------
|Name Age|
------------------
|A  23 |
|B    23 |
|C  20 |
------------------
hive> select * from test;
OK
A 23
B 23
C 20
Time taken: 4.219 seconds, Fetched: 3 row(s)

Lets say we are going to find the median for Age column in "test" table.
Our expected median is "23".

PERCENTILE(BIGINT col,0.5) function helps to compute median in hive.The 50th percentile would be the median.

Structure of  "test" table
hive> desc test;      
OK
firstname            string                                   
age                  int                                      
Time taken: 0.32 seconds, Fetched: 2 row(s)

Here we can see the column we are going to find median is in INT. We need to convert the column into BIGINT.

Lets try out the query
select percentile(cast(age as BIGINT), 0.5) from test; 
Here we casted age column into BIGINT.
hive> select percentile(cast(age as BIGINT), 0.5) from test1; 
Query ID = aibladmin_20141211140606_c61cb042-ed14-4048-8270-4cea1eece1c7 
Total jobs = 1 
Launching Job 1 out of 1 
.
.
OK 
23.0 
Time taken: 27.659 seconds, Fetched: 1 row(s)
23.0 is the expected result which is the median for [23,23,20].

Sunday 7 December 2014

Joining Two Files Using MultipleInput In Hadoop MapReduce - MapSide Join

There are cases where we need to get 2 files as input and join them based on id or something like that.
Two different large data can be joined in map reduce programming also. Joins in Map phase refers as Map side join, while join at reduce side called as reduce side join.  
MapSide can be achieved using MultipleInputFormat in Hadoop.

Say I have 2 files ,One file with EmployeeID,Name,Designation and another file with EmployeeID,Salary,Department.

File1.txt
1 Anne,Admin
2 Gokul,Admin
3 Janet,Sales
4 Hari,Admin

AND

File2.txt
1 50000,A
2 50000,B
3 60000,A
4 50000,C

We will try to join these files into one based on EmployeeID
The result we aim at is 

1 Anne,Admin,50000,A
2 Gokul,Admin,50000,B
3 Janet,Sales,60000,A
4 Hari,Admin,50000,C

Here in both file File1.txt,File2.txt we can see that we need to join the records based on id.  So the employeeId's are common.
We will write 2 map jobs to process these files.

Processing File1.txt
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
 String line=value.toString();
 String[] words=line.split("\t");
 keyEmit.set(words[0]);
 valEmit.set(words[1]);
 context.write(keyEmit, valEmit);
}

The above map job process File1.txt
String[] words=line.split("\t");
splits each line with \t space so words[0] will be the employeeId which we pass it as key and the rest as value.

eg: 1 Anne,Admin
words[0] = 1
words[1] = Anne,Admin

Or else you can also use KeyValueTextInputFormat.class as InputFormat. This class gives key as employeeId and the rest as value.
You dont need to split it.

Processing File2.txt
public void map(LongWritable k, Text v, Context context) throws IOException, InterruptedException
{
 String line=v.toString();
 String[] words=line.split(" ");
 keyEmit.set(words[0]);
 valEmit.set(words[1]);
 context.write(keyEmit, valEmit);
}

The above map job process File2.txt

eg: 1 50000,A
words[0] = 1
words[1] = 50000,A

If the files are of same delimiter and ID comes first you can resuse the same map job

Lets write a commomn Reducer task to join the data using key.
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
{
 int i =0;
 for(Text value:values)
 {
  if(i == 0){
   merge = value.toString()+",";
  }
  else{
   merge += value.toString();
  }
  i++;
 }
 valEmit.set(merge);
 context.write(key, valEmit);
}

Here we will be caching 1 data from a mapper and appends it to string "merge".
And emit employeeId as key and merge as value.

Now we need to furnish our Driver class to take 2 inputs and use MultipleInputFormat as InputFormat


public int run(String[] args) throws Exception {
 Configuration c=new Configuration();
 String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
 Path p1=new Path(files[0]);
 Path p2=new Path(files[1]);
 Path p3=new Path(files[2]);
 FileSystem fs = FileSystem.get(c);
 if(fs.exists(p3)){
  fs.delete(p3, true);
  }
 Job job = new Job(c,"Multiple Job");
 job.setJarByClass(MultipleFiles.class);
 MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
 MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
 job.setReducerClass(MultipleReducer.class);
 .
 .
}

MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
p1,p2 are the Path variable holding 2 input files.
You can find the code in Github

There is one more case where we can make our output in a sequential manner.
Say if we need to get the output as below
1 Anne,Admin,50000,A
2 Gokul,Admin,50000,B
3 Janet,Sales,60000,A
4 Hari,Admin,50000,C
Inorder to achieve the same we can make use of TextPair Writable concepts in Hadoop.
You can find the working code in github . Thanks to one of  my blog reader Ravi Kumar who sorted out the sequence in order.

Tuesday 2 December 2014

Hive Bucketed Tables


In previous post we had seen how  to create partition tables in Hive.

Lets see how to create buckets in Hive table


The main difference between Hive partitioning and Bucketing is ,when we do partitioning, we create a partition for each unique value of the column. But there may be situation where we need to create lot of tiny partitions. But if you use bucketing, you can limit it to a number which you choose and decompose your data into those buckets. In hive a partition is a directory but a bucket is a file.



In hive, bucketing does not work by default. You will have to set following variable to enable bucketing. set hive.enforce.bucketing=true;


1. Creating a staging table to store your data

create external table stagingtbl (EmployeeID Int,FirstName String,Designation String,Salary Int,Department String) row format delimited fields terminated by "," location '/user/aibladmin/Hive'; 

2. Create bucketed table

create table emp_bucket (EmployeeID Int,FirstName String,Designation String,Salary Int,Department String) clustered by (department) into 3 buckets row format delimited fields terminated by ",";

3. Load data from stagingtbl to bucketed table

from stagingtbl insert into table emp_bucket 
       select employeeid,firstname,designation,salary,department;


4. Check how many data file have created in Hive metastore.


Lets check the table content in Hive warehouse




We can find 3 files in warehouse directory for department A,B and C.Each bucket contains unique values.

Monday 1 December 2014

How To Drop A Particular Partition in HIVE


Hive Partition can be dropped using  

ALTER TABLE Tablename DROP IF EXISTS
 PARTITION(PartitionedID=PartitionVALUE);

Lets see an example.
Say I have an emp Hive Table where there are 3 partitions for Department(A,B,C).
Inorder to delete a particular Department use the below query.
ALTER TABLE emp DROP IF EXISTS
  PARTITION(Department='A');