MapReduce and YARN: this is how it works

MapReduce and YARN: this is how it works

Gepubliceerd: Categorie: Beheerdiensten

Meet MapReduce: Initially a programming model devised by Google, meant for processing large datasets in multi-parallel fashion on a cluster of computers. One of the implementations in MapReduce is YARN. Let’s discover how it works. I’ll show you a few of its use cases and give a small example application.

The name ‘MapReduce’ refers to the two phases of computation a user defines. The mapping function does a transformation of the data. The reduce phase, then reduces these numbers to the final desired value. An example would be the counting of words in texts, where the mapping would increment counters for each text, and the reduce function would add all the counters for each text to get a single value. This allows us to split data, run the mapping phase in parallel independent from each other and having the reduce phase aggregate the individual results.

YARN: de facto solution

Various implementations exist, such as in Apache Hadoop, where the second implementation, called YARN is now the de facto solution, which is said to offer better performance than the original MapReduce v1 implementation.

An offshoot of this, called Apache Spark has branched off completely away from the MapReduce programming model.

Very large use cases

As this relates to a parallel massive computation model, the possible use cases are very large, and constitute basically any problem that involves large quantities of data. The use cases are similar to those of Apache Spark.

Text mining

MapReduce is often used to do text mining, parsing and indexing text files, creating word frequency histograms and search are some of the things that can be easily parallelized in MapReduce.

Web Crawling

Google initially used MapReduce in order to crawl the web in parallel. Although Google has since moved on to other techniques, Nutch is a web crawler that still uses MapReduce for example to find new (unvisited) pages and to duplicate webpages.

Document Databases (NoSQL)

MapReduce is used as a SQL variant for NoSQL databases (such as CouchDB) that store semi-structured JSON documents in key-value pairs.

The unrelational nature of these documents make SQL a tough fit and so MapReduce is usually used to query the data.

Hello World

I used this tutorial to get Hadoop up and running on Ubuntu (in single-node configuration). You should end up with a working Hadoop installation with a single namenode.

MapReduce v1

The usual ‘Hello World’ like application used in Hadoop is the WordCount. We run this example in single-node mode. To have a real idea of the performance and installation process a pseudo-distributed (using virtual machines) or a fully distributed configuration is advised.

Roughly, the steps are: create a Java file, compile it using Hadoop (bin/hadoop), wrap it in a JAR. Start DFS put the input in the node using put command and run it. Find the full steps here.

  1. import java.io.IOException;
  2. import java.util.StringTokenizer;
  3.  
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13.  
  14. public class WordCount {
  15.  
  16. public static class TokenizerMapper
  17. extends Mapper<Object, Text, Text, IntWritable>{
  18.  
  19. private final static IntWritable one = NEW IntWritable(1);
  20. private Text word = NEW Text();
  21.  
  22. public void map(Object KEY, Text VALUE, Context context
  23. ) throws IOException, InterruptedException {
  24. StringTokenizer itr = NEW StringTokenizer(VALUE.toString());
  25. while (itr.hasMoreTokens()) {
  26. word.set(itr.nextToken());
  27. context.write(word, one);
  28. }
  29. }
  30. }
  31.  
  32. public static class IntSumReducer
  33. extends Reducer<Text,IntWritable,Text,IntWritable> {
  34. private IntWritable RESULT = NEW IntWritable();
  35.  
  36. public void reduce(Text KEY, Iterable<IntWritable> VALUES,
  37. Context context
  38. ) throws IOException, InterruptedException {
  39. INT SUM = 0;
  40. FOR (IntWritable val : VALUES) {
  41. SUM += val.get();
  42. }
  43. RESULT.set(SUM);
  44. context.write(KEY, RESULT);
  45. }
  46. }
  47.  
  48. public static void main(String[] args) throws Exception {
  49. Configuration conf = NEW Configuration();
  50. Job job = Job.getInstance(conf, "word count");
  51. job.setJarByClass(WordCount.class);
  52. job.setMapperClass(TokenizerMapper.class);
  53. job.setCombinerClass(IntSumReducer.class);
  54. job.setReducerClass(IntSumReducer.class);
  55. job.setOutputKeyClass(Text.class);
  56. job.setOutputValueClass(IntWritable.class);
  57. FileInputFormat.addInputPath(job, NEW Path(args[0]));
  58. FileOutputFormat.setOutputPath(job, NEW Path(args[1]));
  59. System.exit(job.waitForCompletion(TRUE) ? 0 : 1);
  60. }
  61. }

YARN: a faster alternative?

As explained, YARN is meant to replace MapReduce as a faster alternative. Below is the WordCount example in YARN. Unfortunately, it seems more difficult to find a good example like before for YARN, so this was not attempted. I did find this example code which is supposed to do the same as before but in YARN.

  1. import java.io.IOException;
  2. import java.util.StringTokenizer;
  3.  
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14.  
  15. public class WordCount {
  16.  
  17.  public static class TokenizerMapper
  18.       extends Mapper<Object, Text, Text, IntWritable>{
  19.    
  20.    private final static IntWritable one = new IntWritable(1);
  21.    private Text word = new Text();
  22.      
  23.    public void map(Object key, Text value, Context context
  24.                    ) throws IOException, InterruptedException {
  25.      StringTokenizer itr = new StringTokenizer(value.toString());
  26.      while (itr.hasMoreTokens()) {
  27.        word.set(itr.nextToken());
  28.        context.write(word, one);
  29.      }
  30.    }
  31.  }
  32.  
  33.  public static class IntSumReducer
  34.       extends Reducer<Text,IntWritable,Text,IntWritable> {
  35.    private IntWritable result = new IntWritable();
  36.  
  37.    public void reduce(Text key, Iterable<IntWritable> values,
  38.                       Context context
  39.                       ) throws IOException, InterruptedException {
  40.      int sum = 0;
  41.      for (IntWritable val : values) {
  42.        sum += val.get();
  43.      }
  44.      result.set(sum);
  45.      context.write(key, result);
  46.    }
  47.  }
  48.  
  49.  public static void main(String[] args) throws Exception {
  50.    Configuration conf = new Configuration();
  51.    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  52.    if (otherArgs.length != 2) {
  53.      System.err.println("Usage: wordcount <in> <out>");
  54.      System.exit(2);
  55.    }
  56.    @SuppressWarnings("deprecation")
  57. Job job = new Job(conf, "word count");
  58.    job.setJarByClass(WordCount.class);
  59.    job.setMapperClass(TokenizerMapper.class);
  60.    job.setCombinerClass(IntSumReducer.class);
  61.    job.setReducerClass(IntSumReducer.class);
  62.    job.setOutputKeyClass(Text.class);
  63.    job.setOutputValueClass(IntWritable.class);
  64.    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  65.    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  66.    System.exit(job.waitForCompletion(true) ? 0 : 1);
  67.  }
  68.  
  69. }
Gerard Simons
Over auteur Gerard Simons

“Sinds augustus 2015 werk ik bij Qualogy als Junior Data Scientist. Ik analyseer en structureer data, om daarmee statistische modellen te ontwikkelen”

Meer posts van Gerard Simons
Reacties
Reactie plaatsen