3.2 样例分析:单词计数
3.2.1 WordCount源码分析
单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版“Hello World”,该程序的完整代码可以在Hadoop安装包的src/examples目录下找到。单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图3-2所示。本小节将通过分析源代码帮助读者摸清MapReduce程序的基本结构。
图3-2 单词计数
1.Map过程
Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中的value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字符相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成一个个的单词,并将<word, 1>作为map方法的结果输出,其余的工作都交由MapReduce框架处理。其中IntWritable和Text类是Hadoop对int和string类的封装,这些类能够被串行化,以方便在分布式环境中进行数据交换。TokenizerMapper的实现代码如下:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { System.out.println("key = " + key.toString()); //添加查看key 值 System.out.println("value = " + value.toString()); //添加查看value 值
StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
2.Reduce过程
Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。根据3.1.1节的分析,reduce方法的输入参数key为单个单词,而values是由各Mapper上对应单词的计数值所组成的列表,所以只要遍历values并求和,即可得到某个单词出现的总次数。IntSumReducer类的实现代码如下:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val :values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
3.执行MapReduce任务
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程和使用IntSumReducer完成Combine和Reduce过程。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输入和输出路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。主函数实现代码如下:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage:wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 :1); }
3.2.2 WordCount处理过程
上一节已经给出了WordCount的设计思路及源码,但很多细节都未被提及,本节将根据图3-1给出的处理过程,对WordCount进行更详细的讲解。详细的执行步骤如下。
(1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key, value>对,如图3-3所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境下会不同)。
(2)将分割好的<key, value>对交给用户定义的map方法进行处理,生成新的<key, value>对,如图3-4所示。
(3)得到map方法输出的<key, value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果,如图3-5所示。
图3-3 分割过程
图3-4 执行map方法
图3-5 Map端排序及Combine过程
(4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key, value>对,并作为WordCount的输出结果,如图3-6所示。
图3-6 Reduce端排序及输出结果