MapReduce初试

一、境遇

成都创新互联是一家集网站建设,峨眉山企业网站建设,峨眉山品牌网站建设,网站定制,峨眉山网站建设报价,网络营销,网络优化,峨眉山网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。

    接触Hadoop已经有半年了,从Hadoop集群搭建到Hive、HBase、Sqoop相关组件的安装,甚至Spark on Hive、Phoenix、Kylin这些边缘的项目都有涉及。如果说部署,我自认为可以没有任何问题,但是如果说我对于这个系统已经掌握了,我却不敢这么讲,因为至少MapReduce我还没有熟悉,其工作机制也只是一知半解。关于MapReduce的运算,我差不多理解了,但是实际实现现在却只能靠找到的代码,真的是惭愧的很。

    于是再也忍不住,一定要有点自己的东西,最起码,写的时候不用去找别人的博客,嗯,找自己的就行。

二、实验

    1、实验过程

        最开始实验的是最简单的去重MapReduce,在本地文件实验时没有任何问题,但把文件放到HDFS上就怎么也找不到了,究其原因,HDFS上的需要用Hadoop执行jar文件才可以

        1)javac输出类到指定目录 dir

            javac *.java -d dir

        2)jar打包class文件

            1,打包指定class文件到target.jar

                jar cvf target.jar x1.class x2.class ... xn.class

            2,打包指定路径dir下的所有class文件到target.jar

                jar cvf target.jar -C dir .

            3,打包class文件成可执行jar,程序入口Main函数

                jar cvfe tarrget.jar Main -C dir .

            Hadoop只需要普通jar即可,不用打包成可执行jar

        3)执行jar,主类MapDuplicate

            Hadoop jar target.jar MapDuplicate (params)

    2、代码分析

        1)import类

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

            Configuration类:用来设定Hadoop的参数,如:IP、端口等

            Path:用来设定输入输出路径

            IntWritable:MapReduce用到的int类型

            Text:MapReduce用到的string类型

            Job:生成MapReduce任务的主类,任务参数也在此类中设定

            Mapper:被继承的Map类

            Reducer:被继承的Reduce类

            FileInputFormat:输入文件格式

            FileOutputFormat:输出文件格式(可改为其它IO类,如数据库)

            GenericOptionsParser:解析命令行参数的类

        2)代码结构

public class MapDuplicate {
    public static class Map extends Mapper<...> { ... }
    public static class Reduce extends Reducer<...> { ... }
    public static void main(String[] args) throws Ex { ... }
}

        2)Map类

        public static class Map extends Mapper {
                private static Text line = new Text();

                public void map(Object key,Text value,Context context)
                throws IOException,InterruptedException {
                        line = value;
                        context.write(line,new Text(""));
                }
        }

            Map类的主要作用是将数据进行统一处理,按照规则给出键值对,为Combine和Reduce等Reduce操作提供标准化数据。从代码上来讲,均继承Mapper类,并实现map函数

            Mapper类继承的四个参数,前两个分别是输入数据键和值的类型,一般写Object,Text即可;后两个是输出数据键和值的类型,这两个类型必须和Reduce的输入数据键值类型一致。

            所有的Java值类型在送到MapReduce任务前都要转化成对应的值类型:如:String->Text,int->IntWritable,long->LongWritable

            Context是Java类与MapReduce任务交互的类,它把Map的键值对传给Combiner或者Reducer,也把Reducer的结果写到HDFS上

        3)Reduce类

        public static class Reduce extends Reducer {
                public void reduce(Text key,Iterable values,Context context)
                throws IOException,InterruptedException {
                        context.write(key,new Text(""));
                }
        }

            Reduce有两种操作,Combine和Reduce,都继承Reducer类。前者用于对数据进行预处理,将处理好的数据交给Reduce,可以看成是本地的Reduce,当不需要任何处理时,Combine可以直接用Reduce代替;后者用于对数据进行正式处理,将相同键值的数据合并,每一个Reduce函数过程只处理同一个键(key)的数据。

            Reducer类继承的四个参数,前两个是输入数据键和值的类型,必须与Mapper类的输出类型一致(Combine也必须一致,而且Combine输出需要跟Reduce的输入一致,所以Combine输入输出类型必须是相同的);后两个是输出数据键和值的类型,即我们最终得到的结果

        4)Main函数

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                conf.set("mapred.job.tracker","XHadoop1:9010");
                String[] ioArgs = new String[] {"duplicate_in","duplicate_out"};
                String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
                if (otherArgs.length != 2) {
                        System.err.println("Usage: MapDuplicate  ");
                        System.exit(2);
                }
                Job job = new Job(conf,"MapDuplicate");
                job.setJarByClass(MapDuplicate.class);
                job.setMapperClass(Map.class);
                job.setCombinerClass(Reduce.class);
                job.setReducerClass(Reduce.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

            首先,必须有Configuration类,通过这个类指定工作的机器

            然后,接收参数的语句,这个不解释了

            然后,需要有Job类,指定MapReduce处理用到的类,需要指定的有:Mapper类、Combiner类、Reducer类、输出数据键和值类型的类

            然后,指定输入数据的路径

            然后,等待任务结束并退出

三、总结

    这个实验可以说是最简单的MapReduce,但是麻雀虽小五脏俱全。

    从原理来讲,MapReduce有以下步骤:

        HDFS(Block)->Split->Mapper->Partion->Spill->Sort->Combiner->Merge->Reducer->HDFS

    1、HDFS输入数据被分成Split,被Mapper类读取,

    2、Mapper读取数据后,将任务进行Partion(分配)

    3、如果Map操作内存溢出,需要Spill(溢写)到磁盘上

    4、Mapper进行Sort(排序)操作

    5、排序之后进行Combine(合并key)操作,可以理解为本地模式Reduce

    6、Combine的同时会进行溢出文件的Merge(合并)

    7、所有任务完成后将数据交给Reducer进行处理,处理完成写入HDFS

    8、从Map任务开始到Reduce任务开始的数据传输操作叫做Shuffle

    从编程来讲,MapReduce有以下步骤:

    1、编写Mapper类

    2、编写Combiner类(可选)

    3、编写Reducer类

    4、调用过程:参数配置Configuration

                 指定任务类

                 指定输入输出格式

                 指定数据位置

                 开始任务

    以上仅仅是浅层认识,仅供学习参考及备查。


网站名称:MapReduce初试
文章网址:http://pwwzsj.com/article/gipgpg.html