Monday, July 18, 2011

HBase MultiTableOutputFormat writing to multiple tables in one Map Reduce Job

Recently, I've been having a lot of fun learning about HBase and Hadoop. One esoteric thing I just learned about is the way that HBase tables are populated.

By default, HBase / Map Reduce jobs can only write to a single table because you set the output handler at the job level with the job.setOutputFormatClass(). However, if you are creating an HBase table, chances are that you are going to want to build an index related to that table so that you can do fast queries on the master table. The most optimal way to do this is to write the data to both tables at the same time when you are importing the data. The alternative is to write another M/R job to do this after the fact, but that means reading all of the data twice, which is a lot of extra load on the system for no real benefit. Thus, in order to write to both tables at the same time, in the same M/R job, you need to take advantage of the MultiTableOutputFormat class to achieve this result. The key here is that when you write to the context, you specify the name of the table you are writing to. This is some basic example code (with a lot of the meat removed) which demonstrates this.

static class TsvImporter extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	@Override
	public void map(LongWritable offset, Text value, Context context) throws IOException {
		// contains the line of tab separated data we are working on (needs to be parsed out).
		byte[] lineBytes = value.getBytes();

		// rowKey is the hbase rowKey generated from lineBytes
		Put put = new Put(rowKey);
		// Create your KeyValue object
		put.add(kv);
		context.write("actions", put); // write to the actions table

		// rowKey2 is the hbase rowKey
		Put put = new Put(rowKey2);
		// Create your KeyValue object
		put.add(kv);
		context.write("actions_index", put); // write to the actions table
	}
}

public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
	String pathStr = args[0];
	Path inputDir = new Path(pathStr);
	Job job = new Job(conf, "my_custom_job");
	job.setJarByClass(TsvImporter.class);
	FileInputFormat.setInputPaths(job, inputDir);
	job.setInputFormatClass(TextInputFormat.class);
	
	// this is the key to writing to multiple tables in hbase
	job.setOutputFormatClass(MultiTableOutputFormat.class);
	job.setMapperClass(TsvImporter.class);
	job.setNumReduceTasks(0);

	TableMapReduceUtil.addDependencyJars(job);
	TableMapReduceUtil.addDependencyJars(job.getConfiguration());
	return job;
}

7 comments:

Arsalan Bilal said...

Have you tried HBase secondary index with Hadoop map-reduce.? Can you share thought about it?

lise regnier said...

Is there someone who managed to run this code? The context.write function is not defined for (String, Put)?? Any help is welcome

Jon Stevens said...

Sorry lise, it looks like the api may have changed and I haven't kept up with Hadoop enough to know the right solution for you. I think, based on reading the javadoc, somehow, you want to get the writer, which is passed into the Context constructor and then write to that.

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Mapper.Context.html

good luck!

maharaj said...

Hi

Can you explain why did you put

job.setNumReduceTasks(0);

Can number of reducers be more than 1?

Jon Stevens said...

Darn, this was so long ago I can't remember! =(

maharaj said...

thanks for the post anyways...I actually checked..either we disable reducers or write our own reducer....with the default reducer it wont work...

Adio Gemon said...

Guys, this post is correct just that the api is changed so checkout this object

http://archive.cloudera.com/cdh5/cdh/5/hbase-0.96.1.1-cdh5.0.0-beta-2/apidocs/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.html