- Mastering Hadoop 3
- Chanchal Singh Manish Kumar
- 912字
- 2025-04-04 14:54:50
Minimum and maximum
The minimum and maximum calculation for a specific field is a commonly used use case in MapReduce. Once the mapper completes its operation, the reducer simply iterates through all the key values and finds out the minimum and maximum in the key grouping:
- Writables: The idea behind writing custom writables was to save extra effort in splitting data at the reducer side and avoiding unnecessary problems that can occur from the delimiter. Most of the time, we choose the delimiter that is already present in the record and then it leads to the incorrect mapping of records with the field.
We will use the following import packages:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
The custom Writable class encapsulates the details inside the Writable object, which can be used at the reducer side to fetch values for the records:
public class PlayerDetail implements Writable {
private Text playerName;
private IntWritable score;
private Text opposition;
private LongWritable timestamps;
private IntWritable ballsTaken;
private IntWritable fours;
private IntWritable six;
public void readFields(DataInput dataInput) throws IOException {
playerName.readFields(dataInput);
score.readFields(dataInput);
opposition.readFields(dataInput);
timestamps.readFields(dataInput);
ballsTaken.readFields(dataInput);
fours.readFields(dataInput);
six.readFields(dataInput);
}
public void write(DataOutput dataOutput) throws IOException {
playerName.write(dataOutput);
score.write(dataOutput);
opposition.write(dataOutput);
timestamps.write(dataOutput);
ballsTaken.write(dataOutput);
fours.write(dataOutput);
playerName.write(dataOutput);
}
public Text getPlayerName() {
return playerName;
}
public void setPlayerName(Text playerName) {
this.playerName = playerName;
}
public IntWritable getScore() {
return score;
}
public void setScore(IntWritable score) {
this.score = score;
}
public Text getOpposition() {
return opposition;
}
public void setOpposition(Text opposition) {
this.opposition = opposition;
}
public LongWritable getTimestamps() {
return timestamps;
}
public void setTimestamps(LongWritable timestamps) {
this.timestamps = timestamps;
}
public IntWritable getBallsTaken() {
return ballsTaken;
}
public void setBallsTaken(IntWritable ballsTaken) {
this.ballsTaken = ballsTaken;
}
public IntWritable getFours() {
return fours;
}
public void setFours(IntWritable fours) {
this.fours = fours;
}
public IntWritable getSix() {
return six;
}
public void setSix(IntWritable six) {
this.six = six;
}
@Override
public String toString() {
return playerName +
"\t" + score +
"\t" + opposition +
"\t" + timestamps +
"\t" + ballsTaken +
"\t" + fours +
"\t" + six;
}
}
We will import the following packages and implement the custom Writable class:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PlayerReport implements Writable {
private Text playerName;
private IntWritable maxScore;
private Text maxScoreopposition;
private IntWritable minScore;
private Text minScoreopposition;
public void write(DataOutput dataOutput) throws IOException {
playerName.write(dataOutput);
maxScore.write(dataOutput);
maxScoreopposition.write(dataOutput);
minScore.write(dataOutput);
minScoreopposition.write(dataOutput);
}
public void readFields(DataInput dataInput) throws IOException {
playerName.readFields(dataInput);
maxScore.readFields(dataInput);
maxScoreopposition.readFields(dataInput);
minScore.readFields(dataInput);
minScoreopposition.readFields(dataInput);
}
public Text getPlayerName() {
return playerName;
}
public void setPlayerName(Text playerName) {
this.playerName = playerName;
}
public IntWritable getMaxScore() {
return maxScore;
}
public void setMaxScore(IntWritable maxScore) {
this.maxScore = maxScore;
}
public Text getMaxScoreopposition() {
return maxScoreopposition;
}
public void setMaxScoreopposition(Text maxScoreopposition) {
this.maxScoreopposition = maxScoreopposition;
}
public IntWritable getMinScore() {
return minScore;
}
public void setMinScore(IntWritable minScore) {
this.minScore = minScore;
}
public Text getMinScoreopposition() {
return minScoreopposition;
}
public void setMinScoreopposition(Text minScoreopposition) {
this.minScoreopposition = minScoreopposition;
}
@Override
public String toString() {
return playerName +
"\t" + maxScore +
"\t" + maxScoreopposition +
"\t" + minScore +
"\t" + minScoreopposition;
}
}
- Mapper class: The Mapper class in the MinMax algorithm maps the record with the custom writable object and emits the record for each player using the player name as key and PlayerDetail as value, as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.LongWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MinMaxMapper extends
Mapper<LongWritable, Text, Text, PlayerDetail> {
private PlayerDetail playerDetail = new PlayerDetail();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] player = value.toString().split(",");
playerDetail.setPlayerName(new Text(player[0]));
playerDetail.setScore(new IntWritable(Integer.parseInt(player[1])));
playerDetail.setOpposition(new Text(player[2]));
playerDetail.setTimestamps(new LongWritable(Long.parseLong(player[3])));
playerDetail.setBallsTaken(new IntWritable(Integer.parseInt(player[4])));
playerDetail.setFours(new IntWritable(Integer.parseInt(player[5])));
playerDetail.setSix(new IntWritable(Integer.parseInt(player[6])));
context.write(playerDetail.getPlayerName(), playerDetail);
}
}
- Reducer class: The Reducer is responsible for calculating the minimum and maximum scores of each individual by iterating through the list of records of players and emit the record using the PlayerReport writable object, as follows:
import org.apache.Hadoop.io.IntWritable;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MinMaxReducer extends Reducer<Text, PlayerDetail, Text, PlayerReport> {
PlayerReport playerReport = new PlayerReport();
@Override
protected void reduce(Text key, Iterable<PlayerDetail> values, Context context) throws IOException, InterruptedException {
playerReport.setPlayerName(key);
playerReport.setMaxScore(new IntWritable(0));
playerReport.setMinScore(new IntWritable(0));
for (PlayerDetail playerDetail : values) {
int score = playerDetail.getScore().get();
if (score > playerReport.getMaxScore().get()) {
playerReport.setMaxScore(new IntWritable(score));
playerReport.setMaxScoreopposition(playerDetail.getOpposition());
}
if (score < playerReport.getMaxScore().get()) {
playerReport.setMinScore(new IntWritable(score));
playerReport.setMinScoreopposition(playerDetail.getOpposition());
}
context.write(key, playerReport);
}
}
}
- Driver class: The Driver class provides the basic configuration to run MapReduce applications and defines the protocol that cannot be violated by the MapReduce framework. For example, the Driver class mentions the output key class as IntWritable and the value as text, but the reducer tries to emit the key as text and the value as IntWritable. Due to this, the job will fail and an error will be thrown, as follows:
import org.apache.Hadoop.conf.Configuration;
import org.apache.Hadoop.fs.Path;
import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Job;
import org.apache.Hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.Hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.Hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.Hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.Hadoop.util.Tool;
import org.apache.Hadoop.util.ToolRunner;
public class MinMaxDriver {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), (Tool) new MinMaxDriver(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MinMax");
job.setJarByClass(MinMaxDriver.class);
if (args.length < 2) {
System.out.println("Jar requires 2 paramaters : \""
+ job.getJar()
+ " input_path output_path");
return 1;
}
job.setMapperClass(MinMaxMapper.class);
job.setReducerClass(MinMaxReducer.class);
job.setCombinerClass(MinMaxReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PlayerReport.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path filePath = new Path(args[0]);
FileInputFormat.setInputPaths(job, filePath);
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}