Monday 18 June 2012

Using Combiner in Hbase during MapReduce

We all know the importance of the Combiner during the MapReduce from the Hadoop world! The combiner is the reduce type of function during the map operation. This brings the advantage of the local reduction of data when the map output is still in memory without the cost of Disk IO, also the amount of data flow to the reducer is reduced based on the application.

HBase wiki pages provides multiple examples of the Map Reduce with source as HBase table and also destination as HBase table. The following snapshot shows the example of the MapReduce job with Combiner that reads from the data from a table and writes the results to a table.

The following MapReduce example reads the employee HBase table and calculates the count of employees in each city using Combiner.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HbaseCombinerExample {

  public static class MyMapper extends TableMapper<Text, IntWritable> {

    private final IntWritable ONE = new IntWritable(1);
    private Text text = new Text();

    public void map(ImmutableBytesWritable row, Result value,
        Context context) throws IOException, InterruptedException {
      String val = new String(value.getValue(Bytes.toBytes("data"),
          Bytes.toBytes("city")));
      text.set(val); // we can only emit Writables...

      context.write(text, ONE);
    }
  }

  public static class MyTableCombiner extends
      TableReducer<Text, IntWritable, Text> {

    private final IntWritable iw = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      int i = 0;
      for (IntWritable val : values) {
        i += val.get();
      }

      iw.set(i);
      context.write(key, iw);
    }
  }

  public static class MyTableReducer extends
      TableReducer<Text, IntWritable, ImmutableBytesWritable> {

    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      int i = 0;
      for (IntWritable val : values) {
        i += val.get();
      }
      Put put = new Put(Bytes.toBytes(key.toString()));
      put.add(Bytes.toBytes("data"), Bytes.toBytes("count"),
          Bytes.toBytes("" + i));

      context.write(null, put);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleSummary");
    job.setJarByClass(HbaseCombinerExample.class); // class that contains
                            // mapper and reducer
    String sourceTable = "employee";
    String targetTable = "summary";

    Scan scan = new Scan();
    scan.setCaching(500); // 1 is the default in Scan, which will be bad for
                // MapReduce jobs
    scan.setCacheBlocks(false); // don't set to true for MR jobs
    // set other scan attrs

    TableMapReduceUtil.initTableMapperJob(sourceTable, // input table
        scan, // Scan instance to control CF and attribute selection
        MyMapper.class, // mapper class
        Text.class, // mapper output key
        IntWritable.class, // mapper output value
        job);
    job.setCombinerClass(MyTableCombiner.class);
    TableMapReduceUtil.initTableReducerJob(targetTable, // output table
        MyTableReducer.class, // reducer class
        job);
    job.setNumReduceTasks(1); // at least one, adjust as required

    boolean b = job.waitForCompletion(true);
    if (!b) {
      throw new IOException("error with job!");
    }
  }
}


The following shows the scan output of the summary table:

hbase(main):010:0> scan 'summary'
ROW                   COLUMN+CELL

 Bangalore            column=data:count, timestamp=1340038622911, value=1256
 Noida                  column=data:count, timestamp=1340038622911, value=8765
 Delhi                   column=data:count, timestamp=1340038622912, value=8990

4 comments:

  1. You will need to turn your computer into a local server and create a virtual web server (Apache, MySQL and PHP), which can easily be done by installing WordPress into your computer. You will not need to install these server separately and will be able to do that using the XAMP (for Windows) and MAMP (for Macintosh).

    apache jobs

    ReplyDelete
  2. Hey Prafull,

    In Combiner class when context.write(key, iw); use this line I got below error How it will solve please tell
    The method write(Text, Mutation) in the type TaskInputOutputContext is not applicable for the arguments (Text, DoubleWritable)

    ReplyDelete