Sorting and partitioning

The following Mapper swaps the first key with the index key. In our case, the index is already at the first position, so we may not require getRecordInCompositeJoinFormat() here:

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.List;

public class PrepareCompositeJoinRecordMapper extends Mapper<LongWritable, Text, Text, Text> {

private int indexOfKey=0;
private Splitter splitter;
private Joiner joiner;
private Text joinKey = new Text();
String separator=",";

@Override
protected void setup(Context context) throws IOException, InterruptedException {
splitter = Splitter.on(separator);
joiner = Joiner.on(separator);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Iterable<String> recordColumns = splitter.split(value.toString());
joinKey.set(Iterables.get(recordColumns, indexOfKey));
if(indexOfKey != 0){
value.set(getRecordInCompositeJoinFormat(recordColumns, indexOfKey));
}
context.write(joinKey,value);
}


private String getRecordInCompositeJoinFormat(Iterable<String> value, int index){
List<String> temp = Lists.newArrayList(value);
String originalFirst = temp.get(0);
String newFirst = temp.get(index);
temp.set(0,newFirst);
temp.set(index,originalFirst);
return joiner.join(temp);
}
}

Reducer: The reducer emits the record with the key as the join key and the value as the entire record. The value is kept as key because in the composite join Driver class, we are going to use the KeyValueTextInputFormat class as the input format class for CompositeInputFormat, as shown in the following code:

import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PrepareCompositeJoinRecordReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key,value);
}
}
}

Composite join template: The following template can  be used to create and run your composite join example. You can modify the logic with respect to your use case. Let's look into its implementation.

Driver class: The Driver class takes four input arguments. The first two are input data files, the third one is the output file path, and the fourth one is the join type. The composite join supports only inner and outer join type, as follows:


import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapred.*;
import org.apache.Hadoop.mapred.join.CompositeInputFormat;

public class CompositeJoinExampleDriver {

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf("CompositeJoin");
conf.setJarByClass(CompositeJoinExampleDriver.class);

if (args.length < 2) {
System.out.println("Jar requires 4 paramaters : \""
+ conf.getJar()
+ " input_path1 input_path2 output_path jointype[outer or inner] ");
System.exit(1);
}


conf.setMapperClass(CompositeJoinMapper.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(CompositeInputFormat.class);

conf.set("mapred.join.expr", CompositeInputFormat.compose(args[3],
KeyValueTextInputFormat.class, new Path(args[0]), new Path(args[1])));
TextOutputFormat.setOutputPath(conf,new Path(args[2]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
System.exit(job.isSuccessful() ? 0 : 1);
}
}

Mapper class: The Mapper class takes join keys as the mapper's input key and TupleWritable as  the value. Remember the join key will be fetched from the input files and that is why we said the input data should be in specific format, for example:

import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapred.MapReduceBase;
import org.apache.Hadoop.mapred.Mapper;
import org.apache.Hadoop.mapred.OutputCollector;
import org.apache.Hadoop.mapred.Reporter;
import org.apache.Hadoop.mapred.join.TupleWritable;

import java.io.IOException;

public class CompositeJoinMapper extends MapReduceBase implements
Mapper<Text, TupleWritable, Text, Text> {
public void map(Text text, TupleWritable value, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
outputCollector.collect((Text) value.get(0), (Text) value.get(1));
}
}

There are many more design patterns available in MapReduce, but covering all of the patterns is outside the scope of for this book.