MapReduce案例--WordCOunt

美柚明治
2021-06-26 / 0 评论 / 15 阅读 / 正在检测是否收录...

MapReduce案例1--WordCOunt

案例分析

有一个文本,统计文本里面的单词出现次数。

hello world
hello spark
hello java

步骤分析

阶段一:首先从获取到文本文件,通过一个 TextInputFormat的类,这个类会把每一行文本转化成<文本偏移量,一行文本内容>。对应的参数就是<K1,V1>。

阶段二:处理好了文本之后传给Map,Map对传进来的数据按照要求处理(这里是按照空格进行切割)。在Map阶段,我们可以自定义Map逻辑。Map阶段把<K1,V1>转化成<K2,V2>。

阶段三:Shuffle阶段。这一阶段可以实现4个功能:分区,排序,规约,分组。这里的WordCount对这四个功能并不做要求。这一步可以不考虑。采用默认方式处理。这一个阶段会把<k2,v2>转化成新的<K2,V2>。

阶段四:传递给Reduce,Reduce阶段就进行数据的统计。最后获得<k3,v3>。最后把结果输出到一个文件里面。

hadoop的数据类型

hadoop有着自己的数据类型,不能直接使用Java这的数据类型。

Hadoop类型Java类型
TextString
NullWritablenull
IntWritableint
BooleanWritableboolean
LongWritablelong
DoubleWritabledouble

wordcount的代码实现

这里不涉及到分区,排序,规约,分组的功能,所以只需要自定义Map类,Reduce类,以及一个job的调用类。

Maven项目依赖

注意:项目依赖最好是和你的Hadoop安装的版本好一致,避免出现奇怪的问题。

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.2</version>
        </dependency>

日志文件的引入

我使用的是idea本地运行的方式,我们需要把日志输出在idea的控制台里面,所以需要在resources文件夹下面创建log4j.properties文件,同时输入以下内容

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

Map类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    /**
     * 继承Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>,重写 map 方法。
     * hadoop不用java的类型,得用专门的hadoop类型(为了解决java序列化太复杂问题)
     * 四个参数分别对应 k1(行偏移量类似一个文本中第几第几行数据),v1(一行数据),k2(一行数据中的一个单词),v2(词频统计,这里设置1就可)
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /**
         * 将 k1,v1转成k2,v2
         * key === k1   (行偏移量,作用不大)
         * value === v1  (每一行的数据)
         * context  ====  (表示上下文,桥梁作用)
         */
        //匹配context.write类型
        LongWritable longWritable = new LongWritable(1);
        Text text = new Text();
        //拆分数据,然后存在数组
        String[] split = value.toString().split(" ");
        for (String s : split) {
            text.set(s);
            context.write(text, longWritable);
        }
    }
}

Reduce类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        /**
         * 将k2,v2转成K3,V3,并且写入上下文之中
         * reduce中的参数: Text key, Iterable<LongWritable> values
         * 分别对应 k2 (每个单词) 和 新的 v2 (集合)
         * 新的 k2    v2
         *    hello  <1,1,1,1> (多少个1,代表着出现多少次)
         *    world  <1,1,1>
         * 新的 k3    v3
         *    hello  <4>
         *    world  <3>
         */
        //遍历集合,将集合中的数据相加,然后就可以得到v3
        long sum = 0;
        for (LongWritable value : values) {
            sum += value.get();
        }
        context.write(key, new LongWritable(sum));
    }
}

Job类(调用,执行)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountJob extends Configured implements Tool {

    //该方法用于指定一个job任务
    @Override
    public int run(String[] strings) throws Exception {
        //1.创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "WordCountJobMain");
        //如果打包运行出错,加上下面这个
        //job.setJarByClass(WordCountJob.class);
        //2.设置job任务对象
        //2.1指定输入的类,指定读取的路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("file:///D:\\shiyan\\world.txt"));//本地路径
        // TextInputFormat.addInputPath(job, new Path("hdfs://hadoop01:90000/world.txt"));远程路径
        //2.2指定map阶段的处理方式,设置map阶段k2的类型,设置map阶段v2的类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //第三(分区),四(排序),五(规约),六(分组),,采用默认方式不做处理
        //第七步,指定reduce阶段的处理方式和数据类型
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //第八步,设置输出类型以及输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        Path path = new Path("file:///D:\\shiyan\\wt");
        /*
         这一段代码适合在远程的hdfs文件系统之上,不适合本都执行
          //判断目录是否存在,存在就删除,这样就不会报一个目录已经存在的错误了
         FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000"), new Configuration());
         boolean exists = fileSystem.exists(path);
         if (exists){
         //删除目录
         fileSystem.delete(path,true);
         }
         */
        TextOutputFormat.setOutputPath(job, path);
        return job.waitForCompletion(true) ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new WordCountJob(), args);
        System.exit(run);
    }
}
0

打赏

海报

正在生成.....

评论 (0)

取消