Sunday 22 July 2012

Processing 1 Billion records of HBase using MapReduce job

If you would like to read and process 1 Billion records from HBase, how long it would take? To simplify the scenario the following example reads all the messages from a HBase table and just count the number of rows instead of really processing all the messages.

It states how fast we can read the messages from HBase in a MapReduce job for processing. In a real scenario the processing time of the messages would be extra, but it provides a hint about the time required to read 1 Billion records from the HBase.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

public class HbaseRowCounter {

  public static class CounterMapper extends TableMapper<Text, Text> {

    public static enum Counters {
      ROWS
    }

    public void map(ImmutableBytesWritable row, Result value,
        Context context) throws InterruptedException, IOException {
      context.getCounter(Counters.ROWS).increment(1);
    }
  }

  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.out.println("Usage: HbaseRowCounter <tablename>");
      System.exit(0);
    }
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "Row Counter - " + args[0]);
    job.setJarByClass(HbaseRowCounter.class);
    Scan scan = new Scan();
    //scan.setCaching(1000);

    TableMapReduceUtil.initTableMapperJob(args[0], // input table
        scan, // Scan instance
        CounterMapper.class, // mapper class
        Text.class, // mapper output key
        Text.class, // mapper output value
        job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0); // at least one, adjust as required
    boolean b = job.waitForCompletion(true);
    if (!b) {
      throw new IOException("error with job!");
    }
  }
}


To run the above program:
hadoop jar <jar file name> HbaseRowCounter <table name>

The following snapshot shows the hadoop job output and the processing time to read and count the 1 Billion records it takes about 1 hour 15 minutes.

The performance testing is done with 8 node cluster each of 4 quad core 32 GB RAM. Also the table have only 1 column family and single column with record size of each row is approx 150 bytes. The table is also pre-partitioned with 1000 regions.

As it seems quite high, we started looking the ways to optimize it and found caching of the scan object can really improve the performance. The following HBase book contains the details. Please enable the commented blue line in the source code for scan caching. The scan caching avoids the rpc calls for each invocation from the map task to the region-server as described in the blog. After setting the scan caching we could read the 1 Billion records in just 7 minutes 44 seconds.







It means to calculate the total sales from 1 Billion records; each having the instance of selling it would take minimum of 7-8 minutes..


Note that the setCaching option can be set using the configuration property hbase.client.scanner.caching, without changing the code. Please see the following link for details.

Thursday 19 July 2012

HBase GET/PUT commands call flow to ZooKeeper/-ROOT-/.META. and User Table

HBase stores the data in the table lexicographically sorted by their row key. In a large cluster of HBase having many shards aka Regions, How the clients decides the right region to Write or Read the data?

Unlike many other NoSQL database the read/write request from the client does not goes to a component to route/fan-out to the right shard, instead the client are intelligent to route the request directly. It avoids the load on the single component to route request from all the clients.

The following diagram shows all the steps and the explanation follows:


Let's first see a simple Java program that writes the data in HBase using PUT method in 'TEST' table.

public class PutTest {

  private static Configuration conf = HBaseConfiguration.create();
  
  public static void addRecord(String tableName, String rowKey, String family, String qual, String value) throws Exception {
    try {
      HTable table = new HTable(conf, tableName);
      Put put = new Put(Bytes.toBytes(rowKey));
      put.add(Bytes.toBytes(family), Bytes.toBytes(qual), Bytes.toBytes(value));
      table.put(put);
      System.out.println("insert record "+rowKey+" to table "+tableName+" ok.");
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  
  public static void main(String[] args) throws Exception{
    String tablename = args[0];
    PutTest.addRecord(tablename, "row-1", "data", "name", "Joe");
    PutTest.addRecord(tablename, "row-1", "data", "age", "28");
  }

}

The HTable class uses the following steps to find the region of "TEST" table and directly invokes the read/write operation.
  • Step-1 to ZooKeeper: The client first connects to the ZooKeeper quorums to confirm that the HBase Master server is running and finds the location of the Region Server having the top level catalog table -ROOT- region in path /hbase/root-region-server.
  • Step-2 to Root Region: Next client connects to the Region Server having -ROOT- region and fetches the location details of next level catalog table called .META.
  • Step-3 to Meta Region: Again from the .META. table it fetches the details of the user table region having the row key required to read/write the data.
  • Step-4 to user Table Region: Finally the client directly connects to the target region server holding the region of the user table "TEST" and performs the operations e.g. Get/Put/Delete etc. In case of Put operation, this region server stores the request in the MemStore and later flushed to disk.
This information is cached at the client-end and these steps happens only for the first time.
The console output of the above program shows the following and confirms the connection with ZooKeeper, -ROOT- region server, .META. region server with Red, Blue and Green color.

12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.3-1240972, built on 02/06/2012 10:48 GMT
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:host.name=bd-master-1
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:java.version=1.6.0_31
...
...
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:user.name=hadoop
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:user.home=/home/hadoop
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Client environment:user.dir=/home/hadoop/agg
12/07/17 07:28:42 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bd-slave-1:2181,bd-master-1:2181,bd-slave-2:2181 sessionTimeout=600000 watcher=hconnection
12/07/17 07:28:42 INFO zookeeper.ClientCnxn: Opening socket connection to server /192.168.10.12:2181(bd-slave-2)
12/07/17 07:28:42 INFO zookeeper.RecoverableZooKeeper: The identifier of this process is 11564@bd-master-1
12/07/17 07:28:42 INFO client.ZooKeeperSaslClient: Client will not SASL-authenticate because the default JAAS configuration section 'Client' could not be found. If you are not using SASL, you may ignore this. On the other hand, if you expected SASL to work, please fix your JAAS configuration.
12/07/17 07:28:42 INFO zookeeper.ClientCnxn: Socket connection established to bd-slave-2/192.168.10.12:2181, initiating session
12/07/17 07:28:42 INFO zookeeper.ClientCnxn: Session establishment complete on server bd-slave-2/192.168.10.12:2181, sessionid = 0x2388f9e3f5a0015, negotiated timeout = 600000
12/07/17 07:28:42 DEBUG client.HConnectionManager$HConnectionImplementation: Lookedup root region location, connection=org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@4cb9e45a; serverName=bd-slave-4,60020,1342439310156
12/07/17 07:28:42 DEBUG client.HConnectionManager$HConnectionImplementation: Cached location for .META.,,1.1028785192 is bd-slave-8:60020
12/07/17 07:28:42 DEBUG client.MetaScanner: Scanning .META. starting at row=TEST,,00000000000000 for max=10 rows using org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@4cb9e45a
12/07/17 07:28:42 DEBUG client.HConnectionManager$HConnectionImplementation: Cached location for TEST,,1342429701327.f1bd3b3d2d05bd54d8600dd095a25bfa. is bd-slave-6:60020
insert recored row-1 to table TEST ok.
insert recored row-1 to table TEST ok.

TCP/IP Packets
Also I captured the tcp/ip packets to confirms the flow of the HBase clients with the above components:

The following shows that the client is checking the location of the -ROOT- region with ZooKeeper quorums for path /hbase/root-region-server.

Step#2 to search the .META. region in -ROOT- catalog table.
0000  68 b5 99 bd 94 c8 00 25  b3 e2 bd 6c 08 00 45 00   h......% ...l..E.
0010  00 9f 91 05 40 00 40 06  13 92 c0 a8 0a 63 c0 a8   ....@.@. .....c..
0020  0a 0e c9 8b ea 74 5e 0a  1a 20 1d 04 bf d7 80 18   .....t^. . ......
0030  00 2e 96 53 00 00 01 01  08 0a b3 cd 88 46 1c 47   ...S.... .....F.G
0040  bb af 00 00 00 67 00 00  00 02 01 00 13 67 65 74   .....g.. .....get
0050  43 6c 6f 73 65 73 74 52  6f 77 42 65 66 6f 72 65   ClosestR owBefore
0060  00 00 00 00 00 00 00 1d  03 43 4e fa 00 00 00 03   ........ .CN.....
0070  0b 09 2d 52 4f 4f 54 2d  2c 2c 30 0b 2a 2e 4d 45   ..-ROOT- ,,0.*.ME
0080  54 41 2e 2c 54 45 53 54  2c 2c 39 39 39 39 39 39   TA.,TEST ,,999999
0090  39 39 39 39 39 39 39 39  2c 39 39 39 39 39 39 39   99999999 ,9999999
00a0  39 39 39 39 39 39 39 0b  04 69 6e 66 6f            9999999. .info   

Step#3 to search the user table 'TEST' in .META.
0000  d8 d3 85 ba 94 d4 00 25  b3 e2 bd 6c 08 00 45 00   .......% ...l..E.
0010  00 ae d9 75 40 00 40 06  cb 0e c0 a8 0a 63 c0 a8   ...u@.@. .....c..
0020  0a 12 9d a0 ea 74 5d 87  52 91 bc e1 4d 8a 80 18   .....t]. R...M...
0030  00 36 96 66 00 00 01 01  08 0a b3 cd 88 63 1f 50   .6.f.... .....c.P
0040  c6 b3 00 00 00 76 00 00  00 06 01 00 0b 6f 70 65   .....v.. .....ope
0050  6e 53 63 61 6e 6e 65 72  00 00 00 00 00 00 00 1d   nScanner ........
0060  03 43 4e fa 00 00 00 02  0b 09 2e 4d 45 54 41 2e   .CN..... ...META.
0070  2c 2c 31 27 27 02 14 54  45 53 54 2c 2c 30 30 30   ,,1''..T EST,,000
0080  30 30 30 30 30 30 30 30  30 30 30 00 00 00 00 01   00000000 000.....
0090  ff ff ff ff ff ff ff ff  01 00 00 00 00 00 00 00   ........ ........
00a0  00 00 7f ff ff ff ff ff  ff ff 01 00 00 00 01 04   ........ ........
00b0  69 6e 66 6f 00 00 00 00  00 00 00 00               info.... ....    

And finally the Put method sends the information with user name 'Joe'.
0000  68 b5 99 bd 16 78 00 25  b3 e2 bd 6c 08 00 45 00   h....x.% ...l..E.
0010  00 f4 11 a9 40 00 40 06  92 97 c0 a8 0a 63 c0 a8   ....@.@. .....c..
0020  0a 10 e0 be ea 74 5e 42  9a cc c7 43 dc 9b 80 18   .....t^B ...C....
0030  00 2e 96 aa 00 00 01 01  08 0a b3 cd 88 6c 1f 50   ........ .....l.P
0040  2c 95 00 00 00 bc 00 00  00 0b 01 00 05 6d 75 6c   ,....... .....mul
0050  74 69 00 00 00 00 00 00  00 1d 03 43 4e fa 00 00   ti...... ...CN...
0060  00 01 42 42 00 00 00 01  35 54 45 53 54 2c 2c 31   ..BB.... 5TEST,,1
0070  33 34 32 34 32 39 37 30  31 33 32 37 2e 66 31 62   34242970 1327.f1b
0080  64 33 62 33 64 32 64 30  35 62 64 35 34 64 38 36   d3b3d2d0 5bd54d86
0090  30 30 64 64 30 39 35 61  32 35 62 66 61 2e 00 00   00dd095a 25bfa...
00a0  00 01 41 41 40 23 02 05  72 6f 77 2d 31 7f ff ff   ..AA@#.. row-1...
00b0  ff ff ff ff ff ff ff ff  ff ff ff ff ff 01 00 00   ........ ........
00c0  00 01 04 64 61 74 61 00  00 00 01 00 00 00 24 00   ...data. ......$.
00d0  00 00 24 00 00 00 19 00  00 00 03 00 05 72 6f 77   ..$..... .....row
00e0  2d 31 04 64 61 74 61 6e  61 6d 65 7f ff ff ff ff   -1.datan ame.....
00f0  ff ff ff 04 4a 6f 65 00  00 00 00 00 00 00 00 0e   ....Joe. ........
0100  11 0e                                              ..               

We seen that there are 4 steps for the clients to connect to the actual region and read/write the data. Does it have some performance impact on the client? If yes, can we avoid it? Watch the next blog to get the answers...