Matrix operations in Mapreduce
Matrices Sum Map Reducer
Matrix sum is the operation of adding matrices by adding corresponding entries together.
Entrywise sum
The sum of two m × n (pronounced “m by n”) matrices A and B, denoted by A + B, is again an m × n matrix computed by adding corresponding elements
Entrywise sum implementation using Map-reduce-
I have two matrices in separate files ( check blog indexingfor generating index), each file contains same size (m*n) matrix. matrix row index(m) and values separated by ^A [0x001] and values(n) are separated by (,).
- Mapper emits the row index as key and entire row as a value. If you have different types of matrices then create separate mappers to process/ filters the matrix.
class MatrixSumMapper extends Mapper\<LongWritable, Text, LongWritable, Text\> {
String fName = null;
char keySeprator;
@Override protected void setup( Mapper\<LongWritable, Text, LongWritable, Text\>.Context context) throws IOException, InterruptedException { fName = ((FileSplit)context.getInputSplit()).getPath().getName();
keySeprator=(char)context.getConfiguration().getInt("matrix.key.separator",0x001);
} @Override protected void map(LongWritable key, Text value, Mapper\<LongWritable, Text, LongWritable, Text\>.Context context) throws IOException, InterruptedException { LongWritable keyM = new LongWritable(Long.parseLong(value.toString().split(String.format("%c",keySeprator))[0]));
Text val = new Text(value.toString().split(String.format("%c",keySeprator))[1]);
context.write(keyM, val);
} }
- Reducer gets the row key as (m). Next split each value to generate (n) then add the values index and position wise.
- Driver code:
public class Driver extends Configured implements Tool { private static Logger logger = Logger.getLogger(Driver.class); private boolean deleteDirectory(Path path) throws IOException { FileSystem fs = FileSystem.get(getConf()); return fs.delete(path, true); } public int run(String[] args) throws Exception { logger.info("job Matrix Sum Driver Begin"); Configuration conf = getConf(); conf.setInt("matrix.key.separator", 0x001); conf.set("matrix.element.separator",","); Job job = new Job(conf, "Matrix Sum"); job.setJarByClass(Driver.class); Path input1 = new Path(args[0]); Path input2 = new Path(args[1]); Path output = new Path(args[2]); deleteDirectory(output); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(MatrixSumMapper.class); job.setReducerClass(MatrixSumReducer.class); job.setNumReduceTasks(1); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); logger.info("deleting output directory: " + deleteDirectory(output)); FileInputFormat.setInputPaths(job, input1, input2); FileOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { for (String str : args) System.out.println(str); Configuration config = new Configuration(); System.exit(ToolRunner.run(config, new Driver(), args)); } }
check out the complete source code from techsquids