Sunday 30 December 2012

Installing and configuring Ganglia on RHEL

Recently I installed and configured Ganglia to monitor the hadoop cluster, here are the details to install and setup the Ganglia Monitoring server. As the Ganglia RHEL rpm's have lot of dependencies, its easier to install using yum and needs internet access on the server.

Ganglia Monitoring system have the following components:

  • gmond: The Ganglia monitoring demon that needs to be installed on all the monitored nodes
  • gmetad: The Ganglia meta demon that runs on the master node and collects all the data from the monitoring demons(gmond) nodes. The gmetad component uses Round Robin Database(rrdtool) to store the data.
  • Web front-end server: The web front-end server run using the httpd server written in PHP and shows the latest stats and trends in graph formats on various measured parameters.

The following snapshot shows the high-level interaction of the above components:


Installation on RHEL

To install Ganglia gmond on the monitored domain, download the following rpm's from here and install using the following commands.

 rpm -ivh libconfuse-2.7-4.el6.x86_64.rpm  
 rpm -ivh libganglia-3.4.0-1.x86_64.rpm  
 rpm -ivh ganglia-gmond-3.4.0-1.x86_64.rpm  
 service gmond start  

For installation of gmetad on the master node using yum install needs internet access on the server, please run the following commands if you are behind a firewall.
 export https_proxy=proxy server:port  
 export http_proxy=proxy server:port  
 export proxy=proxy server:port  

Also install the latest EPEL repository on the RHEL before running the following command to install the Gangalia meta server and web server.
 yum install ganglia ganglia-gmetad ganglia-web  

The above might fail due to some rpm dependencies, and these dependent component needs to be manually downloaded and installed from internet. e.g. rrdtool, dejavu-fonts etc.

Configuration of Ganglia

After installing the component configure using the configuration file for gmond, gmetad respectively as /etc/ganglia/gmond.conf and /etc/ganglia/gmetad.conf. The default configuration uses the multicast UDP messages and could be used without changes if your network supports multicast messages.

After configuration restart the services using command:

 service gmond restart  
 service gmetad restart  
 service httpd restart  

and open the browser with URL http://<IP of httpd server>/ganglia to start monitoring your servers :).

You might run into the following issues:

  • httpd server is not able to run due to mod_python component, Install mod_python on httpd server if its not already installed
  • URL Access issues due to permission settings in /etc/httpd/conf.d/ganglia.conf. Set the proper configuration or "Allow from all" to access during testing phase.
In the next blog I would describe the details to monitor the Hadoop components statistics using Ganglia server.

Sunday 9 September 2012

Liferay Multiple Instance Portlets - How to display different contents

Liferay Portal server provides the mechanism to create the multiple instances of a portlet. Its a common requirement to display different contents/data in each instance of the portlet.

For example, the following snapshot shows a portlet with 3 instances. Each instance display the different data. The first display the used memory of the JVM, next shows the Free memory and the last shows the Current Sales.


As the multiple instances of the portlet are similar but display the different data, it make sense to develop only a single portlet and create the multiple instances. This blog explains a method to achieve it.

First create a new "Liferay Project" using the eclipse wizard and select the "Allow Multiple Instances" to enable the multiple instances of the portlet, also select the "Edit Mode" of the Portlet Mode option as we would like to configure the Portlet instances from preferences option. It allows the administrator to configure the portlet instances and specify the content to display in the instance. The users without the "configuration" privileges would not be able to change the settings, and view the configured instances.

The Preferences option allows the administrator to specify the content to view in the instance. Choose the Preferences option and..


.. the following is displayed, to provide the content to display in this instance. Please provide the name as follows and save it.


The Portlet Preferences is used in the edit.jsp page to store the preferences of the instance of the portlet. Please see the following edit.jsp source code for details.

   
 <%@ taglib uri="http://java.sun.com/portlet_2_0" prefix="portlet"%>  
 <%@ taglib uri="http://liferay.com/tld/aui" prefix="aui"%>  
 <%@ page import="javax.portlet.PortletPreferences"%>  
   
 <portlet:defineObjects />  
   
 Please provide the name of the portlet to view in  
 <b>MultiInstancePortlet</b>  
 .  
 <%  
      PortletPreferences prefs = renderRequest.getPreferences();  
      String content = renderRequest  
                .getParameter("<portlet:namespace/>content");  
      if (content != null) {  
           prefs.setValue("content", content);  
           prefs.store();  
 %>  
 <p>Saved Successfully!!</p>  
 <%  
      } else {  
           content = (String) prefs.getValue("content", "");  
      }  
 %>  
 <portlet:renderURL var="editURL">  
 </portlet:renderURL>  
 <aui:form action="<%=editURL%>" method="post">  
      <aui:input label="Content" name="<portlet:namespace/>content"  
           type="text" value="<%=content%>" />  
      <aui:button type="submit" />  
 </aui:form>  
   

The view.jsp is similar to the previous blog for Ajax processing using jQuery.
   
 <%@ taglib uri="http://java.sun.com/portlet_2_0" prefix="portlet"%>  
   
 <portlet:defineObjects />  
 <portlet:resourceURL var="resourceURL">  
 </portlet:resourceURL>  
 <script type="text/javascript">  
 function <portlet:namespace/>Data(){  
      $.post("<%=resourceURL%>", function(result) {  
                if (result == 'NOT_CONFIGURED') {  
                     $('#<portlet:namespace/>header')  
                               .text('Portlet not configured!');  
                     return;  
                }  
                var vals = result.split(',');  
                $('#<portlet:namespace/>container').css('width', vals[1] + '%');  
                $('#<portlet:namespace/>value').text(vals[1] + '%');  
                $('#<portlet:namespace/>header').text(vals[0]);  
           });//post  
      }  
      $(document).ready(function() {  
           setInterval("<portlet:namespace/>Data()", 1000);  
      });  
 </script>  
   
 <div id="<portlet:namespace/>header"  
      style="color: orange; font-size: 18px;">JVM Used Memory -</div>  
 <br />  
 <h4 id="<portlet:namespace/>value">--</h4>  
 <div style="border: 1px solid Blue; width: 100%; height: 5px;">  
      <div id="<portlet:namespace/>container"  
           style="background-color: SkyBlue; width: 0px; height: 5px;"></div>  
 </div>  
   

Finally, for backend processing the following MultiInstancePortlet.java class implements the serveResource method invoked during the POST method invocation. It checks the value of "content" in PortletPreferences of the instance and sends the data for that particular "content" type.

   
 package com.test;  
   
 import java.io.IOException;  
 import java.io.PrintWriter;  
   
 import javax.portlet.PortletException;  
 import javax.portlet.PortletPreferences;  
 import javax.portlet.ResourceRequest;  
 import javax.portlet.ResourceResponse;  
   
 import com.liferay.util.bridges.mvc.MVCPortlet;  
   
 /**  
  * Portlet implementation class MultiInstancePortlet  
  */  
 public class MultiInstancePortlet extends MVCPortlet {  
      @Override  
      public void serveResource(ResourceRequest resourceRequest,  
                ResourceResponse resourceResponse) throws IOException,  
                PortletException {  
           System.out.println("serveResource..");  
           long freemem = Runtime.getRuntime().freeMemory();  
           long totalmem = Runtime.getRuntime().totalMemory();  
           long usedmem = totalmem - freemem;  
           usedmem = usedmem / (1024 * 1024);// MB  
           freemem = freemem / (1024 * 1024);//MB  
           totalmem = totalmem / (1024 * 1024);// MB  
   
           PortletPreferences prefs = resourceRequest.getPreferences();  
           String content = prefs.getValue("content", "");  
   
           PrintWriter writer = resourceResponse.getWriter();  
           if(content.equals("JVM Used Memory")){  
                long percentage = usedmem * 100 / totalmem;  
                writer.print(content+ " - "+usedmem + "/" + totalmem + " MB," + percentage);  
           }else if(content.equals("JVM Free Memory")){  
                long percentage = freemem * 100 / totalmem;  
                writer.print(content+ " - "+freemem + "/" + totalmem + " MB," + percentage);  
           }else if(content.equals("Current Sales")){  
                long peakSale = 1000;  
                long currentSale = (long) (peakSale * Math.random());  
                long percentage = currentSale * 100 / peakSale;  
                writer.print(content+ " - "+currentSale + "/" + peakSale + " Units," + percentage);  
           }else if(content.equals("")) {  
                writer.print("NOT_CONFIGURED");  
           }  
           writer.close();  
      }  
 }  
   

Please note that for simplicity, this example shows a very basic mechanism for contents type matching but it could be configurable in some database.

Writing an Ajax Portlet using Liferay

In this blog, I walk-through the steps to create a simple Ajax Portlet using Liferay Portal server. If you have not installed the Liferay server, please follow the steps of "Installation Guide" and also the "Getting Started Tutorial" for the SDK setup and Portlet creation using eclipse.

http://www.liferay.com/community/wiki/-/wiki/Main/Liferay+IDE+Installation+Guide
http://www.liferay.com/community/wiki/-/wiki/Main/Liferay+IDE+Getting+Started+Tutorial

The following snapshot shows a simple Ajax portlet that displays the current memory usage of the JVM and updates the values every second. We would be developing this portlet in this blog..


For the Ajax invocation, the complete page is not refreshed and only a part is changed based on the content received from the server. The following code gets the Used memory/Total Memory of the current (Liferay) JVM and also the percentage value to display in the HTML DIV element.

 package com.test;  
   
 import java.io.IOException;  
 import java.io.PrintWriter;  
   
 import javax.portlet.PortletException;  
 import javax.portlet.ResourceRequest;  
 import javax.portlet.ResourceResponse;  
   
 import com.liferay.util.bridges.mvc.MVCPortlet;  
   
 /**  
  * Portlet implementation class AjaxPortlet  
  */  
 public class AjaxPortlet extends MVCPortlet {
      @Override  
      public void serveResource(ResourceRequest resourceRequest,  
                ResourceResponse resourceResponse) throws IOException,  
                PortletException {  
           System.out.println("serveResource..");  
           long freemem = Runtime.getRuntime().freeMemory();  
           long totalmem = Runtime.getRuntime().totalMemory();  
           long usedmem = totalmem - freemem;  
           long percentage = usedmem*100/totalmem;  
           usedmem = usedmem/(1024*1024);//MB  
           totalmem = totalmem/(1024*1024);//MB  
             
           PrintWriter writer = resourceResponse.getWriter();  
           writer.print(usedmem+"/"+totalmem+" MB,"+percentage);  
           writer.close();  
      }  
 }

For the front-end processing, we need to define the resourceURL and invoke it using HTTP POST method and updates the values received from the server. I am using the jQuery framework for the Ajax invocation, so add the jquery library in the path and update the liferay-portlet.xml. The complete source code of the view.jsp file:

 <%--  
 /**  
 * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.  
 *  
 * This library is free software; you can redistribute it and/or modify it under  
 * the terms of the GNU Lesser General Public License as published by the Free  
 * Software Foundation; either version 2.1 of the License, or (at your option)  
 * any later version.  
 *  
 * This library is distributed in the hope that it will be useful, but WITHOUT  
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS  
 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more  
 * details.  
 */  
 --%>  
   
 <%@ taglib uri="http://java.sun.com/portlet_2_0" prefix="portlet"%>  
   
 <portlet:defineObjects />  
 <portlet:resourceURL var="resourceURL">  
 </portlet:resourceURL>  
 <script type="text/javascript">  
 function <portlet:namespace/>Data(){  
      $.post("<%=resourceURL%>", function(result) {  
                var vals=result.split(',');  
                $('#<portlet:namespace/>container').css('width', vals[1]+'%');  
                $('#<portlet:namespace/>value').text(vals[1]+'%');  
                $('#<portlet:namespace/>header').text('JVM Used Memory - '+vals[0]);  
           });//post  
      }  
      $(document).ready(function() {  
           setInterval("<portlet:namespace/>Data()", 1000);  
      });  
 </script>  
   
 <div id="<portlet:namespace/>header" style="color:orange; font-size: 18px;">  
      JVM Used Memory -   
 </div>  
 <br/>  
 <h4 id="<portlet:namespace/>value">--</h4>  
 <div style="border: 2px solid Blue; width: 100%; height: 5px;">  
      <div id="<portlet:namespace/>container"  
           style="background-color: SkyBlue; width: 0px; height: 5px;"></div>  
 </div>  
   


Also the setInterval method triggers the POST method invocation after every second to update the memory used.

Saturday 11 August 2012

Performance Tip - Select only required columns from HBase

HBase is column-oriented database and allows you to select only the required columns or columns family using Scan objects. For a query on some specific columns, choose only those columns in the scan object to get better performance. Performance is better as the less data needs to be read from the disks and could be read faster.

To set the specific column/column family use the following methods of Scan class.

Scan scan = new Scan();
//Set the specific column name of a column family
//scan.addColumn(Bytes.toBytes(<family>), Bytes.toBytes(<qualifier>));
scan.addColumn(Bytes.toBytes("data"), Bytes.toBytes("firstName"));

//Or set the complete column family
//scan.addFamily(Bytes.toBytes(<family>));
//scan.addFamily(Bytes.toBytes("data"));

Also select only the required rows by using the setStartRow and setStopRow methods of the scan object.

By selecting only the required columns; processing of 1 Billion records of my previous blog could be much faster.

How to calculate the Compression in HBase?

The compression in HBase can really reduce the storage requirements, as the row-key, column-family and the column names repeat for each column of a record. There are many algorithm for compression including LZO, GZIP and SNAPPY. The choice really depends on the compression level and performance of the algorithm. Please look the following link to setup and configure the Compression in HBase.

http://hbase.apache.org/book/compression.html

After setting up the compression, the following formula could be used to get a approximate percentage size reduced by the compression:

100 - StoreFileSize *100 / ((avgKeyLen+avgValueLen+8)*entries)

StoreFileSize: Is the size of the file on the HDFS
avgKeyLen,avgValueLen and entries are output from HFile tool

Use the following commands to check the regions in the table.
hadoop fs -lsr /hbase/<table name>

Now choose a store file with larger size, as it would have more records and check the size.
hadoop fs -lsr /hbase/<table name>/<region>/data/<store file>
-rw-r--r-- 1 prafkum supergroup 25855018 2012-08-11 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/880ababfda4643f59956b90da2dc3d3f


Also run the HFile tool utility on the store file. Please view my previous blog for details.
hbase org.apache.hadoop.hbase.io.hfile.HFile -v -m -f /hbase/<table name>/<region>/data/<store file>

avgKeyLen=29,
avgValueLen=4,
entries=1146564

Using above formula to calculate the compression ratio:

100 - 25855018*100/((29+4+8)*1146564) = 45% compression

Please note that the compression computation is not accurate as the store file also have header along with key and values of the columns. Also the compression could be different in different store files. But of-course you get a good clue of the compression ratio.


Comparison of RCFile and Column-oriented database

This blog tries to list the differences and synergies of the RCFile format of Hadoop and Column-oriented database (like Cassandra, HBase, Hypertable). What parameters should be considered to decide? I am trying to put my thoughts and comments are welcome to add the other factors.

  • Static vs Dynamic Data: The application data could be static(append only) and might never changes like the events from the web server and other fact data. On the other hand the application data could be dynamic and needs to keep the latest status like User profiles, bank transactions etc. For Dynamic data we must go for the column-oriented databases as it constantly needs to updated/deleted BUT for static data the RCFile format seems a better fit (of-course based on the other use cases also)
  • Point Query: Point query needs to retrieve the exact records on the request from application. The Point queries are easily possible with Column-oriented database as they keep the data sorted based on the Row-key and allows the user to search the specific row with the row-key. The Point Queries could be a challenge with RCFile, as it needs to be start a MapReduce job to find a specific row.
  • Range Query:  Range Query is to retrieve all the records with some matching criteria e.g. "All users in a specific group". Range query are easily possible both with RCFile format and with Column-oriented databases. The performance is an important factor to decided based on the use-cases for the range query.
  • Storage Requirements: The storage is important factor and the storage requirements should be calculated for all column-oriented options. Different compression algorithm should be used to find the best.
  • Performance: Based on the application needs, it could be most critical requirement. So needs to be tested with different options and evaluate based on the different parameters e.g. compression.The performance should be faster if the query require less columns compare to all columns.
  • Others: There are many features provided by column-oriented databases like Multiple Versions, Column Families, and storing the Binary data and should be considered before taking a final decision.

This is still work in process and the comments are highly welcome..

Friday 10 August 2012

Column-oriented storage in Hadoop using RCFile

Hadoop stores the large amount of data on the distributed cluster using HDFS. Normally its stored in the CSV file format with some delimiter. For example the logs of the web server or the csv exports from the RDBMS systems. To process the large files with huge data the MapReduce paradigm is used and it can really scale with massive parallel-processing of the cluster.

During the processing all the record including all the columns are read and processed, BUT what if I need to process only few columns out of many? For example if I would like to get the sum of a specific column, why all the columns should be read and impact the Disk IO?

There are column oriented databases like Cassandra, HBase, Hypertable and Vertica. For details of Column-oriented storage see the wiki page.

There are many advantages of using the column-oriented database; but here we see the storing of the data in Hadoop file system in column-oriented format using RCFile.

RCFile (Record Columnar File) format can partition the data horizontally(rows) and vertically(columns) and allows to fetch only the specific columns during the processing and avoid the Disk IO penalty with all the columns.

The simplest way to create the RCFile format is using the Hive as follows:

CREATE TABLE USER_RC(
  id int,
  name string,
  age int,
  manager string,
  salary int
)
STORED AS RCFILE;


To store the data in this table in RCFile format, follow the following steps:

CREATE TABLE USER_TEXT(
  id int,
  name string,
  age int,
  manager string,
  salary int
)
ROW FORMAT DELIMITED fields terminated by ',';

load data local inpath '/tmp/user.txt' into table USER_TEXT;

INSERT OVERWRITE table USER_RC SELECT * from USER_TEXT;

Now run the hive query to sum the salary using the query from both the table:

select sum(salary) from USER_TEXT;
select sum(salary) from USER_RC:

It starts the map reduce job and watch the HDFS_BYTES_READ parameter to see the difference of the bytes read from the HDFS. You can see the huge difference of the data read; as the RCFile is reading only the salary column and the text format is reading the complete data to execute the query.

For example the following file in text format (/tmp/user.txt):

1,Charry,30,Joe,878
2,Roy,31,Joe,879
3,Robert,32,Joe,880
4,Barry,33,Joe,881

would be stored in RCFile fomat as follows and reads only the last row and skips all other data.
1,2,3,4
Charry,Roy,Robert,Barry
30,31,32,33
Joe,Joe,Joe,Joe
878,879,880,881

To confirm this format the following shows the browse of the file /user/hive/warehouse/user_rc/00000_0 in HDFS:


Thursday 9 August 2012

How to verify the record size of HBase?

From my previous blog "How to calculate the record size of HBase?" its easy to calculate the record size of HBase and estimate the storage requirements. But how to verify it during the testing phase and in production environment?

HFile tool of HBase could be used to find the average key size, average value size and number of records per store file in HDFS. It can also be used to see the actual records in the store file.

To use if first browse the HDFS in path "hadoop fs -lsr /hbase/<table name>" and find the store file of the table as follows for 'employee' table:

 Prafulls-MacBook-Pro:~ prafkum$ hadoop fs -lsr /hbase/employee  
 -rw-r--r--  1 prafkum supergroup    521 2012-08-10 06:22 /hbase/employee/.tableinfo.0000000001  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:22 /hbase/employee/.tmp  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:22 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.oldlogs  
 -rw-r--r--  1 prafkum supergroup    124 2012-08-10 06:22 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.oldlogs/hlog.1344559953739  
 -rw-r--r--  1 prafkum supergroup    231 2012-08-10 06:22 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.regioninfo  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.tmp  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data  
 -rw-r--r--  1 prafkum supergroup    722 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f  

Look for the files in "data" directory and choose any one e.g. /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f in the above output.

Now use the HFile tool as follows on store file. Please use only the -m option to print the meta data. Use another options like -p to print the actual content of file and -s for statistics. (Don't use them in production as the data might be huge in store file)
 Prafulls-MacBook-Pro:bin prafkum$ hbase org.apache.hadoop.hbase.io.hfile.HFile -v -p -s -m -f /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f  
 Scanning -> /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f  
 12/08/10 06:32:42 INFO hfile.CacheConfig: Allocating LruBlockCache with maximum size 246.9m  
 K: row1/data:employeeId/1344560028049/Put/vlen=3 V: 123  
 K: row1/data:firstName/1344560042111/Put/vlen=3 V: Joe  
 K: row1/data:lastName/1344560058448/Put/vlen=6 V: Robert  
 Block index size as per heapsize: 416  
 reader=/hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f,  
   compression=none,  
   cacheConf=CacheConfig:enabled [cacheDataOnRead=true] [cacheDataOnWrite=false] [cacheIndexesOnWrite=false] [cacheBloomsOnWrite=false] [cacheEvictOnClose=false] [cacheCompressed=false],  
   firstKey=row1/data:employeeId/1344560028049/Put,  
   lastKey=row1/data:lastName/1344560058448/Put,  
   avgKeyLen=29,  
   avgValueLen=4,  
   entries=3,  
   length=722  
 Trailer:  
   fileinfoOffset=241,  
   loadOnOpenDataOffset=150,  
   dataIndexCount=1,  
   metaIndexCount=0,  
   totalUncomressedBytes=655,  
   entryCount=3,  
   compressionCodec=NONE,  
   uncompressedDataIndexSize=43,  
   numDataIndexLevels=1,  
   firstDataBlockOffset=0,  
   lastDataBlockOffset=0,  
   comparatorClassName=org.apache.hadoop.hbase.KeyValue$KeyComparator,  
   version=2  
 Fileinfo:  
   KEY_VALUE_VERSION = \x00\x00\x00\x01  
   MAJOR_COMPACTION_KEY = \x00  
   MAX_MEMSTORE_TS_KEY = \x00\x00\x00\x00\x00\x00\x00\x00  
   MAX_SEQ_ID_KEY = 10  
   TIMERANGE = 1344560028049....1344560058448  
   hfile.AVG_KEY_LEN = 29  
   hfile.AVG_VALUE_LEN = 4  
   hfile.LASTKEY = \x00\x04row1\x04datalastName\x00\x00\x019\x0E\x06PP\x04  
 Mid-key: \x00\x04row1\x04dataemployeeId\x00\x00\x019\x0E\x05\xD9\x91\x04  
 Bloom filter:  
   Not present  
 Stats:  
 Key length: count: 3     min: 28     max: 30     mean: 29.0  
 Val length: count: 3     min: 3     max: 6     mean: 4.0  
 Row size (bytes): count: 1     min: 123     max: 123     mean: 123.0  
 Row size (columns): count: 1     min: 3     max: 3     mean: 3.0  
 Key of biggest row: row1  
 Scanned kv count -> 3  

The above output could be used to validate the record size using formula
(8+avgKeyLen+avgValueLen)*columns per record

So for the above output
(8+29+4)*3=123 

and its equal to our calculation in the previous blog...

Please note that the result is approximate as its calculated based on the average values. Also the first parameter 8 is actually the "Key Length" and "Value Length" of 4 Byte each as described in the last blog.

Sunday 22 July 2012

Processing 1 Billion records of HBase using MapReduce job

If you would like to read and process 1 Billion records from HBase, how long it would take? To simplify the scenario the following example reads all the messages from a HBase table and just count the number of rows instead of really processing all the messages.

It states how fast we can read the messages from HBase in a MapReduce job for processing. In a real scenario the processing time of the messages would be extra, but it provides a hint about the time required to read 1 Billion records from the HBase.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

public class HbaseRowCounter {

  public static class CounterMapper extends TableMapper<Text, Text> {

    public static enum Counters {
      ROWS
    }

    public void map(ImmutableBytesWritable row, Result value,
        Context context) throws InterruptedException, IOException {
      context.getCounter(Counters.ROWS).increment(1);
    }
  }

  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.out.println("Usage: HbaseRowCounter <tablename>");
      System.exit(0);
    }
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "Row Counter - " + args[0]);
    job.setJarByClass(HbaseRowCounter.class);
    Scan scan = new Scan();
    //scan.setCaching(1000);

    TableMapReduceUtil.initTableMapperJob(args[0], // input table
        scan, // Scan instance
        CounterMapper.class, // mapper class
        Text.class, // mapper output key
        Text.class, // mapper output value
        job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0); // at least one, adjust as required
    boolean b = job.waitForCompletion(true);
    if (!b) {
      throw new IOException("error with job!");
    }
  }
}


To run the above program:
hadoop jar <jar file name> HbaseRowCounter <table name>

The following snapshot shows the hadoop job output and the processing time to read and count the 1 Billion records it takes about 1 hour 15 minutes.

The performance testing is done with 8 node cluster each of 4 quad core 32 GB RAM. Also the table have only 1 column family and single column with record size of each row is approx 150 bytes. The table is also pre-partitioned with 1000 regions.

As it seems quite high, we started looking the ways to optimize it and found caching of the scan object can really improve the performance. The following HBase book contains the details. Please enable the commented blue line in the source code for scan caching. The scan caching avoids the rpc calls for each invocation from the map task to the region-server as described in the blog. After setting the scan caching we could read the 1 Billion records in just 7 minutes 44 seconds.







It means to calculate the total sales from 1 Billion records; each having the instance of selling it would take minimum of 7-8 minutes..


Note that the setCaching option can be set using the configuration property hbase.client.scanner.caching, without changing the code. Please see the following link for details.

Thursday 19 July 2012

HBase GET/PUT commands call flow to ZooKeeper/-ROOT-/.META. and User Table

HBase stores the data in the table lexicographically sorted by their row key. In a large cluster of HBase having many shards aka Regions, How the clients decides the right region to Write or Read the data?

Unlike many other NoSQL database the read/write request from the client does not goes to a component to route/fan-out to the right shard, instead the client are intelligent to route the request directly. It avoids the load on the single component to route request from all the clients.

The following diagram shows all the steps and the explanation follows:


Let's first see a simple Java program that writes the data in HBase using PUT method in 'TEST' table.

public class PutTest {

  private static Configuration conf = HBaseConfiguration.create();
  
  public static void addRecord(String tableName, String rowKey, String family, String qual, String value) throws Exception {
    try {
      HTable table = new HTable(conf, tableName);
      Put put = new Put(Bytes.toBytes(rowKey));
      put.add(Bytes.toBytes(family), Bytes.toBytes(qual), Bytes.toBytes(value));
      table.put(put);
      System.out.println("insert record "+rowKey+" to table "+tableName+" ok.");
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  
  public static void main(String[] args) throws Exception{
    String tablename = args[0];
    PutTest.addRecord(tablename, "row-1", "data", "name", "Joe");
    PutTest.addRecord(tablename, "row-1", "data", "age", "28");
  }

}

The HTable class uses the following steps to find the region of "TEST" table and directly invokes the read/write operation.
  • Step-1 to ZooKeeper: The client first connects to the ZooKeeper quorums to confirm that the HBase Master server is running and finds the location of the Region Server having the top level catalog table -ROOT- region in path /hbase/root-region-server.
  • Step-2 to Root Region: Next client connects to the Region Server having -ROOT- region and fetches the location details of next level catalog table called .META.
  • Step-3 to Meta Region: Again from the .META. table it fetches the details of the user table region having the row key required to read/write the data.
  • Step-4 to user Table Region: Finally the client directly connects to the target region server holding the region of the user table "TEST" and performs the operations e.g. Get/Put/Delete etc. In case of Put operation, this region server stores the request in the MemStore and later flushed to disk.
This information is cached at the client-end and these steps happens only for the first time.
The console output of the above program shows the following and confirms the connection with ZooKeeper, -ROOT- region server, .META. region server with Red, Blue and Green color.

12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.3-1240972, built on 02/06/2012 10:48 GMT
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:host.name=bd-master-1
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:java.version=1.6.0_31
...
...
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:user.name=hadoop
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:user.home=/home/hadoop
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:user.dir=/home/hadoop/agg
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bd-slave-1:2181,bd-master-1:2181,bd-slave-2:2181 sessionTimeout=600000 watcher=hconnection
12/07/17 07:28:42 INFO zookeeper.ClientCnxn: Opening socket connection to server /192.168.10.12:2181(bd-slave-2)
12/07/17 07:28:42 INFO zookeeper.RecoverableZooKeeper: The identifier of this process is 11564@bd-master-1
12/07/17 07:28:42 INFO client.ZooKeeperSaslClient: Client will not SASL-authenticate because the default JAAS configuration section 'Client' could not be found. If you are not using SASL, you may ignore this. On the other hand, if you expected SASL to work, please fix your JAAS configuration.
12/07/17 07:28:42 INFO zookeeper.ClientCnxn: Socket connection established to bd-slave-2/192.168.10.12:2181, initiating session
12/07/17 07:28:42 INFO zookeeper.ClientCnxn: Session establishment complete on server bd-slave-2/192.168.10.12:2181, sessionid = 0x2388f9e3f5a0015, negotiated timeout = 600000
12/07/17 07:28:42 DEBUG client.HConnectionManager$HConnectionImplementation: Lookedup root region location, connection=org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@4cb9e45a; serverName=bd-slave-4,60020,1342439310156
12/07/17 07:28:42 DEBUG client.HConnectionManager$HConnectionImplementation: Cached location for .META.,,1.1028785192 is bd-slave-8:60020
12/07/17 07:28:42 DEBUG client.MetaScanner: Scanning .META. starting at row=TEST,,00000000000000 for max=10 rows using org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@4cb9e45a
12/07/17 07:28:42 DEBUG client.HConnectionManager$HConnectionImplementation: Cached location for TEST,,1342429701327.f1bd3b3d2d05bd54d8600dd095a25bfa. is bd-slave-6:60020
insert recored row-1 to table TEST ok.
insert recored row-1 to table TEST ok.

TCP/IP Packets
Also I captured the tcp/ip packets to confirms the flow of the HBase clients with the above components:

The following shows that the client is checking the location of the -ROOT- region with ZooKeeper quorums for path /hbase/root-region-server.

Step#2 to search the .META. region in -ROOT- catalog table.
0000  68 b5 99 bd 94 c8 00 25  b3 e2 bd 6c 08 00 45 00   h......% ...l..E.
0010  00 9f 91 05 40 00 40 06  13 92 c0 a8 0a 63 c0 a8   ....@.@. .....c..
0020  0a 0e c9 8b ea 74 5e 0a  1a 20 1d 04 bf d7 80 18   .....t^. . ......
0030  00 2e 96 53 00 00 01 01  08 0a b3 cd 88 46 1c 47   ...S.... .....F.G
0040  bb af 00 00 00 67 00 00  00 02 01 00 13 67 65 74   .....g.. .....get
0050  43 6c 6f 73 65 73 74 52  6f 77 42 65 66 6f 72 65   ClosestR owBefore
0060  00 00 00 00 00 00 00 1d  03 43 4e fa 00 00 00 03   ........ .CN.....
0070  0b 09 2d 52 4f 4f 54 2d  2c 2c 30 0b 2a 2e 4d 45   ..-ROOT- ,,0.*.ME
0080  54 41 2e 2c 54 45 53 54  2c 2c 39 39 39 39 39 39   TA.,TEST ,,999999
0090  39 39 39 39 39 39 39 39  2c 39 39 39 39 39 39 39   99999999 ,9999999
00a0  39 39 39 39 39 39 39 0b  04 69 6e 66 6f            9999999. .info   

Step#3 to search the user table 'TEST' in .META.
0000  d8 d3 85 ba 94 d4 00 25  b3 e2 bd 6c 08 00 45 00   .......% ...l..E.
0010  00 ae d9 75 40 00 40 06  cb 0e c0 a8 0a 63 c0 a8   ...u@.@. .....c..
0020  0a 12 9d a0 ea 74 5d 87  52 91 bc e1 4d 8a 80 18   .....t]. R...M...
0030  00 36 96 66 00 00 01 01  08 0a b3 cd 88 63 1f 50   .6.f.... .....c.P
0040  c6 b3 00 00 00 76 00 00  00 06 01 00 0b 6f 70 65   .....v.. .....ope
0050  6e 53 63 61 6e 6e 65 72  00 00 00 00 00 00 00 1d   nScanner ........
0060  03 43 4e fa 00 00 00 02  0b 09 2e 4d 45 54 41 2e   .CN..... ...META.
0070  2c 2c 31 27 27 02 14 54  45 53 54 2c 2c 30 30 30   ,,1''..T EST,,000
0080  30 30 30 30 30 30 30 30  30 30 30 00 00 00 00 01   00000000 000.....
0090  ff ff ff ff ff ff ff ff  01 00 00 00 00 00 00 00   ........ ........
00a0  00 00 7f ff ff ff ff ff  ff ff 01 00 00 00 01 04   ........ ........
00b0  69 6e 66 6f 00 00 00 00  00 00 00 00               info.... ....    

And finally the Put method sends the information with user name 'Joe'.
0000  68 b5 99 bd 16 78 00 25  b3 e2 bd 6c 08 00 45 00   h....x.% ...l..E.
0010  00 f4 11 a9 40 00 40 06  92 97 c0 a8 0a 63 c0 a8   ....@.@. .....c..
0020  0a 10 e0 be ea 74 5e 42  9a cc c7 43 dc 9b 80 18   .....t^B ...C....
0030  00 2e 96 aa 00 00 01 01  08 0a b3 cd 88 6c 1f 50   ........ .....l.P
0040  2c 95 00 00 00 bc 00 00  00 0b 01 00 05 6d 75 6c   ,....... .....mul
0050  74 69 00 00 00 00 00 00  00 1d 03 43 4e fa 00 00   ti...... ...CN...
0060  00 01 42 42 00 00 00 01  35 54 45 53 54 2c 2c 31   ..BB.... 5TEST,,1
0070  33 34 32 34 32 39 37 30  31 33 32 37 2e 66 31 62   34242970 1327.f1b
0080  64 33 62 33 64 32 64 30  35 62 64 35 34 64 38 36   d3b3d2d0 5bd54d86
0090  30 30 64 64 30 39 35 61  32 35 62 66 61 2e 00 00   00dd095a 25bfa...
00a0  00 01 41 41 40 23 02 05  72 6f 77 2d 31 7f ff ff   ..AA@#.. row-1...
00b0  ff ff ff ff ff ff ff ff  ff ff ff ff ff 01 00 00   ........ ........
00c0  00 01 04 64 61 74 61 00  00 00 01 00 00 00 24 00   ...data. ......$.
00d0  00 00 24 00 00 00 19 00  00 00 03 00 05 72 6f 77   ..$..... .....row
00e0  2d 31 04 64 61 74 61 6e  61 6d 65 7f ff ff ff ff   -1.datan ame.....
00f0  ff ff ff 04 4a 6f 65 00  00 00 00 00 00 00 00 0e   ....Joe. ........
0100  11 0e                                              ..               

We seen that there are 4 steps for the clients to connect to the actual region and read/write the data. Does it have some performance impact on the client? If yes, can we avoid it? Watch the next blog to get the answers...

Tuesday 26 June 2012

How to Delete the HBase Region including all data?

HBase provides the mechanism to pre-create the regions for a table. It helps to distribute the data across the regions in the HBase cluster. Please see the following link for details of pre-creating the regions in HBase.

http://hbase.apache.org/book/perf.writing.html

The Row Key design is the magic to distribute the data across the regions. The Row key could be composite key. e.g. "User ID + Transaction ID". The following blogs provides the details:

http://riteshadval.blogspot.in/2012/03/hbase-composite-row-key-design-doing.html

So in that case the data of the specific users would be in the specific regions. In some scenarios we might need to delete all the data for these users, maybe when the users are not active.

An following method can be used to delete and recreate the complete region in HBase. Please be careful and ensure that you are passing the right regionStartKey, as all the data in that region would be deleted without a need of major compaction.

 private void recreateRegion(String tableName, String regionStartKey) {  
    try {  
       Configuration conf = HBaseConfiguration.create();  
       HTable hTable = new HTable(conf, tableName);  
       HTableDescriptor desc = hTable.getTableDescriptor();  
       byte[][] startKeys = hTable.getStartKeys();  
       for (int i = 0; i < startKeys.length; i++) {  
          byte[] startKey = startKeys[i];  
          if (Bytes.toString(startKey).equals(regionStartKey)) {  
             FileSystem fs = FileSystem.get(conf);  
             Path rootDir = new Path(conf.get("hbase.rootdir"));  
             HRegionInfo info = hTable.getRegionLocation(startKey)  
                   .getRegionInfo();  
             System.out.println("deleting region - " + info.toString());  
             HRegion.deleteRegion(fs, rootDir, info);  
             System.out.println("creating region - " + info.toString());  
             HRegion newRegion = HRegion.createHRegion(info, rootDir,  
                   conf, desc);  
             newRegion.close();  
             break;  
          }  
       }  
       hTable.close();  
    } catch (Exception e) {  
       e.printStackTrace();  
    }  
 }  

The following steps needs to be followed:


1. Disable the table
disable 'TEST'


2. Check the HFile on the HDFS and you see 2 files in region 9c52cac6d90c7a43d98887aed68179a7.
3. Execute the above program for a table with start key for this region

4. The following snapshot shows that both the HFile of region 9c52cac6d90c7a43d98887aed68179a7 are deleted





5. Enable the table again
enable 'TEST'

After enabling the table in HBase, all the data in that region is deleted and you wont see the data of the region if you scan the table. The benefit of this approach is that there is no need of major compaction to delete the huge amount of data in that region.

Sunday 24 June 2012

Shell script to execute the commands on all Farm Servers

There are many softwares including the open source to manage the server farm. Its fun to execute and run the jobs in the Hadoop/HBase cluster. But as the cluster grows with more and more servers, it becomes more and more difficult to manage the server farm without the sophisticated softwares to manage it.

The following simple script could be really useful to run the unix commands on all the farm servers:

 #!/bin/bash  
 #script used to run the command on hadoop cluster  
 #  
 if [ -z "$1" ]; then  
  echo "Usage: cluster_run.sh <<Unix Command>>"  
  exit 1;  
 fi  
 for server in `cat servers.dat`;do  
  echo Running command $1 on server $server ...  
  echo "=================================================================="  
  ssh $server $1  
  echo ""  
 done;  

Create a file servers.dat with the host name of all the servers:

 server1
 server2
 server3
 ..
 serverN

Now to see all the processes running on the servers, just execute:

cluster_run.sh 'jps'

To see the disk usage:

cluster_run.sh 'df -k'

Please ensure that the silent/auto login is enabled. If not use the following links to enable it before executing this script:

http://www.rebol.com/docs/ssh-auto-login.html
http://hortonworks.com/kb/generating-ssh-keys-for-passwordless-login/

Monday 18 June 2012

Using Combiner in Hbase during MapReduce

We all know the importance of the Combiner during the MapReduce from the Hadoop world! The combiner is the reduce type of function during the map operation. This brings the advantage of the local reduction of data when the map output is still in memory without the cost of Disk IO, also the amount of data flow to the reducer is reduced based on the application.

HBase wiki pages provides multiple examples of the Map Reduce with source as HBase table and also destination as HBase table. The following snapshot shows the example of the MapReduce job with Combiner that reads from the data from a table and writes the results to a table.

The following MapReduce example reads the employee HBase table and calculates the count of employees in each city using Combiner.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HbaseCombinerExample {

  public static class MyMapper extends TableMapper<Text, IntWritable> {

    private final IntWritable ONE = new IntWritable(1);
    private Text text = new Text();

    public void map(ImmutableBytesWritable row, Result value,
        Context context) throws IOException, InterruptedException {
      String val = new String(value.getValue(Bytes.toBytes("data"),
          Bytes.toBytes("city")));
      text.set(val); // we can only emit Writables...

      context.write(text, ONE);
    }
  }

  public static class MyTableCombiner extends
      TableReducer<Text, IntWritable, Text> {

    private final IntWritable iw = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      int i = 0;
      for (IntWritable val : values) {
        i += val.get();
      }

      iw.set(i);
      context.write(key, iw);
    }
  }

  public static class MyTableReducer extends
      TableReducer<Text, IntWritable, ImmutableBytesWritable> {

    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      int i = 0;
      for (IntWritable val : values) {
        i += val.get();
      }
      Put put = new Put(Bytes.toBytes(key.toString()));
      put.add(Bytes.toBytes("data"), Bytes.toBytes("count"),
          Bytes.toBytes("" + i));

      context.write(null, put);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleSummary");
    job.setJarByClass(HbaseCombinerExample.class); // class that contains
                            // mapper and reducer
    String sourceTable = "employee";
    String targetTable = "summary";

    Scan scan = new Scan();
    scan.setCaching(500); // 1 is the default in Scan, which will be bad for
                // MapReduce jobs
    scan.setCacheBlocks(false); // don't set to true for MR jobs
    // set other scan attrs

    TableMapReduceUtil.initTableMapperJob(sourceTable, // input table
        scan, // Scan instance to control CF and attribute selection
        MyMapper.class, // mapper class
        Text.class, // mapper output key
        IntWritable.class, // mapper output value
        job);
    job.setCombinerClass(MyTableCombiner.class);
    TableMapReduceUtil.initTableReducerJob(targetTable, // output table
        MyTableReducer.class, // reducer class
        job);
    job.setNumReduceTasks(1); // at least one, adjust as required

    boolean b = job.waitForCompletion(true);
    if (!b) {
      throw new IOException("error with job!");
    }
  }
}


The following shows the scan output of the summary table:

hbase(main):010:0> scan 'summary'
ROW                   COLUMN+CELL

 Bangalore            column=data:count, timestamp=1340038622911, value=1256
 Noida                  column=data:count, timestamp=1340038622911, value=8765
 Delhi                   column=data:count, timestamp=1340038622912, value=8990

Sunday 17 June 2012

How to calculate the record size of HBase?

HBase database is used to store the large amount of data. The size of each record can really make a huge difference in the total storage requirements for the solution. For example to store 1 billion records with just 1 extra byte needs ~1 GB extra disk space.

The following section covers the details of the record in HBase. The record consist of the Row ID, Column Family Name, Column Qualifier, Timestamp and the Value.

As HBase is the column oriented database, it stores each value with fully qualified row key. For example to store the employee data with employeeId, firstName and lastName, the fully qualified row key is repeated for each column. So what is the total disk space required to store this data?

First, the following shows the scan of the employee table with actual data:
hbase(main):011:0> scan 'employee'
ROW                     COLUMN+CELL
 row1                   column=data:employeeId, timestamp=1339960912955, value=123
 row1                   column=data:firstName, timestamp=1339960810908, value=Joe
 row1                   column=data:lastName, timestamp=1339960868771, value=Robert

HBase KeyValue format

HBase stores the data in KeyValue format. The following picture shows the details including the data types and the size required by each field:

Fig 1 : Key Value Format of HBase

So to calculate the record size:
Fixed part needed by KeyValue format = Key Length + Value Length + Row Length + CF Length + Timestamp + Key Value = ( 4 + 4 + 2 + 1 + 8 + 1) = 20 Bytes

Variable part needed by KeyValue format = Row + Column Family + Column Qualifier + Value


Total bytes required = Fixed part + Variable part

So for the above example let's calculate the record size:
First Column = 20 + (4 + 4 + 10 + 3) = 41 Bytes
Second Column = 20 + (4 + 4 + 9 + 3) = 40 Bytes
Third Column = 20 + (4 + 4 + 8 + 6) = 42 Bytes

Total Size for the row1 in above example = 123 Bytes


To Store 1 billion such records the space required = 123 * 1 billion = ~ 123 GB

Please see the following snapshot of KeyValue.java for details of the fields type and size in HBase KeyValue:

  static byte [] createByteArray(final byte [] row, final int roffset,
      final int rlength, final byte [] family, final int foffset, int flength,
      final byte [] qualifier, final int qoffset, int qlength,
      final long timestamp, final Type type,
      final byte [] value, final int voffset, int vlength) {
      ..
      ..
// Allocate right-sized byte array.
    byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
    // Write key, value and key row length.
    int pos = 0;
    pos = Bytes.putInt(bytes, pos, keylength);
    pos = Bytes.putInt(bytes, pos, vlength);
    pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
    pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
    pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
    if(flength != 0) {
      pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
    }
    if(qlength != 0) {
      pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
    }
    pos = Bytes.putLong(bytes, pos, timestamp);
    pos = Bytes.putByte(bytes, pos, type.getCode());
    if (value != null && value.length > 0) {
      pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
    }
    return bytes;
}