实战Hadoop
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

3.2 样例分析:单词计数

3.2.1 WordCount源码分析

单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版“Hello World”,该程序的完整代码可以在Hadoop安装包的src/examples目录下找到。单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图3-2所示。本小节将通过分析源代码帮助读者摸清MapReduce程序的基本结构。

图3-2 单词计数

1.Map过程

Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中的value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字符相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成一个个的单词,并将<word, 1>作为map方法的结果输出,其余的工作都交由MapReduce框架处理。其中IntWritable和Text类是Hadoop对int和string类的封装,这些类能够被串行化,以方便在分布式环境中进行数据交换。TokenizerMapper的实现代码如下:

public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { System.out.println("key = " + key.toString()); //添加查看key 值 System.out.println("value = " + value.toString()); //添加查看value 值
StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

2.Reduce过程

Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。根据3.1.1节的分析,reduce方法的输入参数key为单个单词,而values是由各Mapper上对应单词的计数值所组成的列表,所以只要遍历values并求和,即可得到某个单词出现的总次数。IntSumReducer类的实现代码如下:

public static class IntSumReducer
           extends Reducer<Text,IntWritable,Text,IntWritable> {
      private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val :values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

3.执行MapReduce任务

在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程和使用IntSumReducer完成Combine和Reduce过程。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输入和输出路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。主函数实现代码如下:

public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length != 2) {
      System.err.println("Usage:wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 :1);
}

3.2.2 WordCount处理过程

上一节已经给出了WordCount的设计思路及源码,但很多细节都未被提及,本节将根据图3-1给出的处理过程,对WordCount进行更详细的讲解。详细的执行步骤如下。

(1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key, value>对,如图3-3所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境下会不同)。

(2)将分割好的<key, value>对交给用户定义的map方法进行处理,生成新的<key, value>对,如图3-4所示。

(3)得到map方法输出的<key, value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果,如图3-5所示。

图3-3 分割过程

图3-4 执行map方法

图3-5 Map端排序及Combine过程

(4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key, value>对,并作为WordCount的输出结果,如图3-6所示。

图3-6 Reduce端排序及输出结果