- Mastering Hadoop 3
- Chanchal Singh Manish Kumar
- 511字
- 2025-04-04 14:54:50
Top-k MapReduce implementation
The top-k reduce algorithm is a popular algorithm in MapReduce. The mappers are responsible for emitting top-k records at its level and then reducer filters out top-k records from all the records it received from the mapper. We will be using an example of player score that we used previously. The objective is to find out top-k players with the lowest score. Let's look onto the mapper implementation. We are assuming that each player has a unique score, otherwise the logic will require a little change, and we need to keep a list of players' details in values and emit only 10 records from the cleanup method.
The code for TopKMapper can be seen as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
public class TopKMapper extends
Mapper<LongWritable, Text, IntWritable, PlayerDetail> {
private int K = 10;
private TreeMap<Integer, PlayerDetail> topKPlayerWithLessScore = new TreeMap<Integer, PlayerDetail>();
private PlayerDetail playerDetail = new PlayerDetail();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] player = value.toString().split(",");
playerDetail.setPlayerName(new Text(player[0]));
playerDetail.setScore(new IntWritable(Integer.parseInt(player[1])));
playerDetail.setOpposition(new Text(player[2]));
playerDetail.setTimestamps(new LongWritable(Long.parseLong(player[3])));
playerDetail.setBallsTaken(new IntWritable(Integer.parseInt(player[4])));
playerDetail.setFours(new IntWritable(Integer.parseInt(player[5])));
playerDetail.setSix(new IntWritable(Integer.parseInt(player[6])));
topKPlayerWithLessScore.put(playerDetail.getScore().get(), playerDetail);
if (topKPlayerWithLessScore.size() > K) {
topKPlayerWithLessScore.remove(topKPlayerWithLessScore.lastKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Integer, PlayerDetail> playerDetailEntry : topKPlayerWithLessScore.entrySet()) {
context.write(new IntWritable(playerDetailEntry.getKey()), playerDetail);
}
}
}
The TopKReducer has the same logic as that of the reducer and we are assuming that scores are unique for players. We can also have logic for duplicate player scores and emit records for the same. The code for TopKReducer can be seen as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
public class TopKReducer extends Reducer<IntWritable, PlayerDetail, IntWritable, PlayerDetail> {
private int K = 10;
private TreeMap<Integer, PlayerDetail> topKPlayerWithLessScore = new TreeMap<Integer, PlayerDetail>();
private PlayerDetail playerDetail = new PlayerDetail();
@Override
protected void reduce(IntWritable key, Iterable<PlayerDetail> values, Context context) throws IOException, InterruptedException {
for (PlayerDetail playerDetail : values) {
topKPlayerWithLessScore.put(key.get(), playerDetail);
if (topKPlayerWithLessScore.size() > K) {
topKPlayerWithLessScore.remove(topKPlayerWithLessScore.lastKey());
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Integer, PlayerDetail> playerDetailEntry : topKPlayerWithLessScore.entrySet()) {
context.write(new IntWritable(playerDetailEntry.getKey()), playerDetail);
}
}
}
The Driver class has a configuration of job.setNumReduceTasks(1), which means that only one reducer will be running to find out the top-k records, otherwise, in case of multiple reducers, we will have multiple top-k files. The code for TopKDriver can be seen as follows:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;
public class TopKDriver {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new TopKDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "TopK");
job.setNumReduceTasks(1);
job.setJarByClass(TopKDriver.class);
if (args.length < 2) {
System.out.println("Jar requires 2 paramaters : \""
+ job.getJar()
+ " input_path output_path");
return 1;
}
job.setMapperClass(TopKMapper.class);
job.setReducerClass(TopKReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PlayerDetail.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path filePath = new Path(args[0]);
FileInputFormat.setInputPaths(job, filePath);
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}