Using a Custom Partitioner in Pentaho MapReduce

Unknown macro: {scrollbar}

How to use a custom partitioner in Pentaho MapReduce. In some situations you may wish to specify which reducer a particular key goes to. For example you are parsing a weblog, have a complex key containing IP address, year, and month and need all of the data for a year to go to a particular reducer. For more information on partitioners: http://developer.yahoo.com/hadoop/tutorial/module5.html#partitioning

Prerequisites

In order to follow along with this how-to guide you will need the following:

  • Hadoop
  • Pentaho Data Integration
  • Pentaho Hadoop Distribution

Sample Files

The sample data file needed for this guide is:

File Name

Content

weblogs_parse.txt.zip

Parsed, raw weblog data

Note: If you have already completed the Using Pentaho MapReduce to Parse Weblog Data guide the data should already be in the correct spot.

Add the file to your cluster by running the following:

hadoop fs -mkdir /user/pdi/weblogs
hadoop fs -mkdir /user/pdi/weblogs/parse
hadoop fs -put weblogs_parse.txt /user/pdi/weblogs/parse/part-00000

Sample Code

This guide expands upon the Using Pentaho MapReduce to Generate an Aggregate Dataset guide. If you have completed this guide you should already have the necessary code, otherwise download aggregate_mapper.ktr, aggregate_reducer.ktr, and aggregate_mr.kjb.

Step-By-Step Instructions

Setup

Start Hadoop if it is not already running.

Create a Custom Partitioner in Java

In this task you will create a Java partitioner that takes a key in the format client_ip tab year tab month and partition on the year.

Speed Tip

You can download CustomPartitioner.jar containing the partitioner if you don't want to do every step

  1. Create Year Partitioner Class: In a text editor create a new file named YearPartitioner.java containing the following code:
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.Partitioner;
    
    
    public class YearPartitioner implements Partitioner<Text, LongWritable> {
    
    	public void configure(JobConf job) {}
    
    	public int getPartition(Text key, LongWritable value,
    			int numReduceTasks) {
    		String sKey = key.toString();
    		String[] splits=sKey.split("\t");  //Split the key on tab
    		int year = Integer.parseInt(splits[1]);  //The year is the second field
    		return year % numReduceTasks;  //Return the year mod number of reduce tasks as the partitioner number to send the record to.
    	}
    }
  2. Compile the Class: Run the following command:
    javac -classpath ${HADOOP_HOME}/hadoop-core.jar YearPartitioner.java
  3. Collect the Class into a Jar: Run the following command:
    jar cvf CustomPartitioner.jar YearPartitioner.class

Deploy the Custom Partitioner

In this task you will deploy the custom partitioner to the cluster so it may be used in the Distributed Cache.

  1. Create a Directory: Create a directory to store the custom partitioner:
    hadoop fs -mkdir /distcache
  2. Add the Custom Partitioner to the Cluster: Add the CustomPartitioner.jar to HDFS:
    hadoop fs -put CustomPartitioner.jar /distcache

Configure Pentaho MapReduce to Use Custom Partitioner

In this task you will configure the aggregate_mr.kjb job to use the custom partitioner.

Speed Tip

You can download the already completed aggregate_mr_partition.kjb if you do not want to do every step

  1. Start PDI on your desktop. Once it is running choose 'File' -> 'Open', browse to and select the 'aggregate_mr.kjb', then click 'OK'.
  2. Configure Number of Reducers: Double click on the 'Pentaho MapReduce' job entry. Once it is open switch to the 'Cluster' tab and set 'Number of Reducer Tasks' to '3'.

  3. Configure Partitioner to Use: Switch to the User Defined tab and enter the following:

    Name

    Value

    Explanation

    mapred.cache.files

    /distcache/CustomPartitioner.jar

    Adds the Custom Partitioner to the distributed cache for the job.

    mapred.job.classpath.files

    /distcache/CustomPartitioner.jar

    Adds the Custom Partitioner from the distributed cache to the java classpath for the job.

    mapred.partitioner.class

    YearPartitioner

    Tells the job to use the YearPartitioner class.


  4. Save the Job: Choose 'File' -> 'Save as...' from the menu system. Save the transformation as 'aggregate_mr_partition.kjb' into a folder of your choice.
  5. Run the Job: Choose 'Action' -> 'Run' from the menu system or click on the green run button on the job toolbar. A 'Execute a job' window will open. Click on the 'Launch' button. An 'Execution Results' panel will open at the bottom of the PDI window and it will show you the progress of the job as it runs. After a few seconds the job should finish successfully.

Check Hadoop

  1. View the first Output File: This command should return an empty file. There are only 2 years of data in the sample file, but you specified 3 reducers, therefore one reducer will receive no data.
    hadoop fs -cat /user/pdi/aggregate_mr/part-00000
  2. View the second Output File: This command should only return data for the year 2010.
    hadoop fs -cat /user/pdi/aggregate_mr/part-00001 | head -10
  3. View the third Output File: This command should only return data for the year 2011.
     hadoop fs -cat /user/pdi/aggregate_mr/part-00002 | head -10