Tuesday 26 June 2012

How to Delete the HBase Region including all data?

HBase provides the mechanism to pre-create the regions for a table. It helps to distribute the data across the regions in the HBase cluster. Please see the following link for details of pre-creating the regions in HBase.

http://hbase.apache.org/book/perf.writing.html

The Row Key design is the magic to distribute the data across the regions. The Row key could be composite key. e.g. "User ID + Transaction ID". The following blogs provides the details:

http://riteshadval.blogspot.in/2012/03/hbase-composite-row-key-design-doing.html

So in that case the data of the specific users would be in the specific regions. In some scenarios we might need to delete all the data for these users, maybe when the users are not active.

An following method can be used to delete and recreate the complete region in HBase. Please be careful and ensure that you are passing the right regionStartKey, as all the data in that region would be deleted without a need of major compaction.

 private void recreateRegion(String tableName, String regionStartKey) {  
    try {  
       Configuration conf = HBaseConfiguration.create();  
       HTable hTable = new HTable(conf, tableName);  
       HTableDescriptor desc = hTable.getTableDescriptor();  
       byte[][] startKeys = hTable.getStartKeys();  
       for (int i = 0; i < startKeys.length; i++) {  
          byte[] startKey = startKeys[i];  
          if (Bytes.toString(startKey).equals(regionStartKey)) {  
             FileSystem fs = FileSystem.get(conf);  
             Path rootDir = new Path(conf.get("hbase.rootdir"));  
             HRegionInfo info = hTable.getRegionLocation(startKey)  
                   .getRegionInfo();  
             System.out.println("deleting region - " + info.toString());  
             HRegion.deleteRegion(fs, rootDir, info);  
             System.out.println("creating region - " + info.toString());  
             HRegion newRegion = HRegion.createHRegion(info, rootDir,  
                   conf, desc);  
             newRegion.close();  
             break;  
          }  
       }  
       hTable.close();  
    } catch (Exception e) {  
       e.printStackTrace();  
    }  
 }  

The following steps needs to be followed:


1. Disable the table
disable 'TEST'


2. Check the HFile on the HDFS and you see 2 files in region 9c52cac6d90c7a43d98887aed68179a7.
3. Execute the above program for a table with start key for this region

4. The following snapshot shows that both the HFile of region 9c52cac6d90c7a43d98887aed68179a7 are deleted





5. Enable the table again
enable 'TEST'

After enabling the table in HBase, all the data in that region is deleted and you wont see the data of the region if you scan the table. The benefit of this approach is that there is no need of major compaction to delete the huge amount of data in that region.

Sunday 24 June 2012

Shell script to execute the commands on all Farm Servers

There are many softwares including the open source to manage the server farm. Its fun to execute and run the jobs in the Hadoop/HBase cluster. But as the cluster grows with more and more servers, it becomes more and more difficult to manage the server farm without the sophisticated softwares to manage it.

The following simple script could be really useful to run the unix commands on all the farm servers:

 #!/bin/bash  
 #script used to run the command on hadoop cluster  
 #  
 if [ -z "$1" ]; then  
  echo "Usage: cluster_run.sh <<Unix Command>>"  
  exit 1;  
 fi  
 for server in `cat servers.dat`;do  
  echo Running command $1 on server $server ...  
  echo "=================================================================="  
  ssh $server $1  
  echo ""  
 done;  

Create a file servers.dat with the host name of all the servers:

 server1
 server2
 server3
 ..
 serverN

Now to see all the processes running on the servers, just execute:

cluster_run.sh 'jps'

To see the disk usage:

cluster_run.sh 'df -k'

Please ensure that the silent/auto login is enabled. If not use the following links to enable it before executing this script:

http://www.rebol.com/docs/ssh-auto-login.html
http://hortonworks.com/kb/generating-ssh-keys-for-passwordless-login/

Monday 18 June 2012

Using Combiner in Hbase during MapReduce

We all know the importance of the Combiner during the MapReduce from the Hadoop world! The combiner is the reduce type of function during the map operation. This brings the advantage of the local reduction of data when the map output is still in memory without the cost of Disk IO, also the amount of data flow to the reducer is reduced based on the application.

HBase wiki pages provides multiple examples of the Map Reduce with source as HBase table and also destination as HBase table. The following snapshot shows the example of the MapReduce job with Combiner that reads from the data from a table and writes the results to a table.

The following MapReduce example reads the employee HBase table and calculates the count of employees in each city using Combiner.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HbaseCombinerExample {

  public static class MyMapper extends TableMapper<Text, IntWritable> {

    private final IntWritable ONE = new IntWritable(1);
    private Text text = new Text();

    public void map(ImmutableBytesWritable row, Result value,
        Context context) throws IOException, InterruptedException {
      String val = new String(value.getValue(Bytes.toBytes("data"),
          Bytes.toBytes("city")));
      text.set(val); // we can only emit Writables...

      context.write(text, ONE);
    }
  }

  public static class MyTableCombiner extends
      TableReducer<Text, IntWritable, Text> {

    private final IntWritable iw = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      int i = 0;
      for (IntWritable val : values) {
        i += val.get();
      }

      iw.set(i);
      context.write(key, iw);
    }
  }

  public static class MyTableReducer extends
      TableReducer<Text, IntWritable, ImmutableBytesWritable> {

    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      int i = 0;
      for (IntWritable val : values) {
        i += val.get();
      }
      Put put = new Put(Bytes.toBytes(key.toString()));
      put.add(Bytes.toBytes("data"), Bytes.toBytes("count"),
          Bytes.toBytes("" + i));

      context.write(null, put);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleSummary");
    job.setJarByClass(HbaseCombinerExample.class); // class that contains
                            // mapper and reducer
    String sourceTable = "employee";
    String targetTable = "summary";

    Scan scan = new Scan();
    scan.setCaching(500); // 1 is the default in Scan, which will be bad for
                // MapReduce jobs
    scan.setCacheBlocks(false); // don't set to true for MR jobs
    // set other scan attrs

    TableMapReduceUtil.initTableMapperJob(sourceTable, // input table
        scan, // Scan instance to control CF and attribute selection
        MyMapper.class, // mapper class
        Text.class, // mapper output key
        IntWritable.class, // mapper output value
        job);
    job.setCombinerClass(MyTableCombiner.class);
    TableMapReduceUtil.initTableReducerJob(targetTable, // output table
        MyTableReducer.class, // reducer class
        job);
    job.setNumReduceTasks(1); // at least one, adjust as required

    boolean b = job.waitForCompletion(true);
    if (!b) {
      throw new IOException("error with job!");
    }
  }
}


The following shows the scan output of the summary table:

hbase(main):010:0> scan 'summary'
ROW                   COLUMN+CELL

 Bangalore            column=data:count, timestamp=1340038622911, value=1256
 Noida                  column=data:count, timestamp=1340038622911, value=8765
 Delhi                   column=data:count, timestamp=1340038622912, value=8990

Sunday 17 June 2012

How to calculate the record size of HBase?

HBase database is used to store the large amount of data. The size of each record can really make a huge difference in the total storage requirements for the solution. For example to store 1 billion records with just 1 extra byte needs ~1 GB extra disk space.

The following section covers the details of the record in HBase. The record consist of the Row ID, Column Family Name, Column Qualifier, Timestamp and the Value.

As HBase is the column oriented database, it stores each value with fully qualified row key. For example to store the employee data with employeeId, firstName and lastName, the fully qualified row key is repeated for each column. So what is the total disk space required to store this data?

First, the following shows the scan of the employee table with actual data:
hbase(main):011:0> scan 'employee'
ROW                     COLUMN+CELL
 row1                   column=data:employeeId, timestamp=1339960912955, value=123
 row1                   column=data:firstName, timestamp=1339960810908, value=Joe
 row1                   column=data:lastName, timestamp=1339960868771, value=Robert

HBase KeyValue format

HBase stores the data in KeyValue format. The following picture shows the details including the data types and the size required by each field:

Fig 1 : Key Value Format of HBase

So to calculate the record size:
Fixed part needed by KeyValue format = Key Length + Value Length + Row Length + CF Length + Timestamp + Key Value = ( 4 + 4 + 2 + 1 + 8 + 1) = 20 Bytes

Variable part needed by KeyValue format = Row + Column Family + Column Qualifier + Value


Total bytes required = Fixed part + Variable part

So for the above example let's calculate the record size:
First Column = 20 + (4 + 4 + 10 + 3) = 41 Bytes
Second Column = 20 + (4 + 4 + 9 + 3) = 40 Bytes
Third Column = 20 + (4 + 4 + 8 + 6) = 42 Bytes

Total Size for the row1 in above example = 123 Bytes


To Store 1 billion such records the space required = 123 * 1 billion = ~ 123 GB

Please see the following snapshot of KeyValue.java for details of the fields type and size in HBase KeyValue:

  static byte [] createByteArray(final byte [] row, final int roffset,
      final int rlength, final byte [] family, final int foffset, int flength,
      final byte [] qualifier, final int qoffset, int qlength,
      final long timestamp, final Type type,
      final byte [] value, final int voffset, int vlength) {
      ..
      ..
// Allocate right-sized byte array.
    byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
    // Write key, value and key row length.
    int pos = 0;
    pos = Bytes.putInt(bytes, pos, keylength);
    pos = Bytes.putInt(bytes, pos, vlength);
    pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
    pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
    pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
    if(flength != 0) {
      pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
    }
    if(qlength != 0) {
      pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
    }
    pos = Bytes.putLong(bytes, pos, timestamp);
    pos = Bytes.putByte(bytes, pos, type.getCode());
    if (value != null && value.length > 0) {
      pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
    }
    return bytes;
}