package com.lucl.hadoop.mapreduce.rand; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * @author luchunli * @description 自定义InputFormat */ public class RandomInputFormat extends InputFormat{ public static float [] floatValues = null; /** 自定义分片规则 **/ @Override public List getSplits(JobContext context) throws IOException, InterruptedException { // 初始化数组的长度 int NumOfValues = context.getConfiguration().getInt("lucl.random.nums", 100); floatValues = new float[NumOfValues]; Random random = new Random (); for (int i = 0; i < NumOfValues; i++) { floatValues[i] = random.nextFloat(); } System.out.println("生成的随机数的值如下:"); for (float f : floatValues) { System.out.println(f); } System.out.println("===================="); // 如下代码表示指定两个map task来处理这100个小数,每个map task处理50个小数 // 初始化split分片数目,split分片的数量等于map任务的数量,但是也可以通过配置参数mapred.map.tasks来指定 // 如果该参数和splits的切片数不一致时,map task的数目如何确定,后续再通过代码分析 int NumSplits = context.getConfiguration().getInt("mapreduce.job.maps", 2); int begin = 0; // Math.floor是为了下取整,这里是100刚好整除,如果是99的话Math.floor的值是49.0 // 50 int length = (int)Math.floor(NumOfValues / NumSplits); // end = 49,第一个split的范围就是0~49 int end = length - 1; // 默认的FileInputFormat类的getSplits方法中是通过文件数目和blocksize进行分的, // 文件超过一个块会分成多个split,否则一个文件一个split分片 List splits = new ArrayList (); for (int i = 0; i < NumSplits - 1; i++) { // 2个splits分片,分别为0和1 RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); // begin是上一个split切片的下一个值 begin = end + 1; // 50 // 切片的长度不变,结束位置为起始位置+分片的长度,而数组下标是从0开始的, // 因此结束位置应该是begin加长度-1 end = begin + (length - 1); // 50 + (50 -1) = 99 } RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); /** * * 通过默认的TextInputFormat来实现的时候,如果有两个小文件,则splits=2,参见: * http://luchunli.blog.51cto.com/2368057/1676185 **/ return splits; } @Override public RecordReadercreateRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new RandomRecordReader(); } }
package com.lucl.hadoop.mapreduce.rand; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; /** * @author luchunli * @description * 自定义InputSplit,参照了{@link org.apache.hadoop.mapreduce.lib.input.Filesplit} **/ public class RandomInputSplit extends InputSplit implements Writable { private int start; private int end; private ArrayWritable floatArray = new ArrayWritable(FloatWritable.class); public RandomInputSplit () {} /** * Constructs a split * * @param start * @param end * */ public RandomInputSplit (int start, int end) { this.start = start; this.end = end; int len = this.end - this.start + 1; int index = start; FloatWritable [] result = new FloatWritable[len]; for (int i = 0; i < len; i++) { float f = RandomInputFormat.floatValues[index]; FloatWritable fw = new FloatWritable(f); result[i] = fw; index++; } floatArray.set(result); // System.out.println("查看分片数据:"); // for (FloatWritable fw : (FloatWritable[])floatArray.toArray()) { // System.out.println(fw.get()); // } // System.out.println("====================="); } @Override public long getLength() throws IOException, InterruptedException { return this.end - this.start; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[]{"dnode1", "dnode2"}; } @Override public void readFields(DataInput in) throws IOException { this.start = in.readInt(); this.end = in.readInt(); this.floatArray.readFields(in); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.getStart()); out.writeInt(this.getEnd()); this.floatArray.write(out); } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } public ArrayWritable getFloatArray() { return floatArray; } @Override public String toString() { return this.getStart() + "-" + this.getEnd(); } }
* FileSplit是针对HDFS上文件的实现,因此其属性包括文件绝对路径(Path)、分片起始位置(start)、 * 分片长度(length)、副本信息(保存Block复本数据到的主机数组)。 *
* 自定义的InputSplit是针对内存中的数组数据进行的处理,因此无需记录文件路径及副本信息,只需要记录对数组分片的起始位置、分片长度即可。 *
package com.lucl.hadoop.mapreduce.rand; import java.io.IOException; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * @author luchunli * @description 自定义RecordReader * */ public class RandomRecordReader extends RecordReader{ private int start; private int end; private int index; private IntWritable key = null; private ArrayWritable value = null; private RandomInputSplit rsplit = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.rsplit = (RandomInputSplit)split; this.start = this.rsplit.getStart(); this.end = this.rsplit.getEnd(); this.index = this.start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (null == key) { key = new IntWritable(); } if (null == value) { value = new ArrayWritable(FloatWritable.class); } if (this.index <= this.end) { key.set(this.index); value = rsplit.getFloatArray(); index = end + 1; return true; } return false; } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public ArrayWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (this.index == this.end) { return 0F; } return Math.min(1.0F, (this.index - this.start) / (float)(this.end - this.start)); } @Override public void close() throws IOException { // ...... } }
package com.lucl.hadoop.mapreduce.rand; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; /** * @author luchunli * @description Mapper */ public class RandomMapper extends Mapper{ private static final IntWritable one = new IntWritable(1); @Override protected void setup(Context context) throws IOException, InterruptedException { // 为了查看当前map是在那台机器上执行的,在该机器上创建个随机文件, // 执行完成后到DN节点对应目录下查看即可 SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss"); File file = new File("/home/hadoop", "Mapper-" + format.format(new Date())); if (!file.exists()) { file.createNewFile(); } } @Override protected void map(IntWritable key, ArrayWritable value, Context context) throws IOException, InterruptedException { FloatWritable [] floatArray = (FloatWritable[])value.toArray(); float maxValue = floatArray[0].get(); float tmp = 0; for (int i = 1; i < floatArray.length; i++) { tmp = floatArray[i].get(); if (tmp > maxValue) { maxValue = tmp; } } // 这里必须要保证多个map输出的key是一样的,否则reduce处理时会认为是不同的数据, // shuffle会分成多个组,导致每个map task算出一个最大值 context.write(one, new FloatWritable(maxValue)); } }
package com.lucl.hadoop.mapreduce.rand; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author luchunli * @description Rducer */ public class RandomReducer extends Reducer{ @Override protected void setup(Context context) throws IOException, InterruptedException { SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss"); // 为了查看当前reduce是在那台机器上执行的,在该机器上创建个随机文件 File file = new File("/home/hadoop", "Reducer-" + format.format(new Date())); if (!file.exists()) { file.createNewFile(); } } @Override protected void reduce(IntWritable key, Iterable value, Context context) throws IOException, InterruptedException { Iterator it = value.iterator(); float maxValue = 0; float tmp = 0; if (it.hasNext()) { maxValue = it.next().get(); } else { context.write(new Text("The max value is : "), new FloatWritable(maxValue)); return; } while (it.hasNext()) { tmp = it.next().get(); if (tmp > maxValue) { maxValue = tmp; } } context.write(new Text("The max value is : "), new FloatWritable(maxValue)); } }
package com.lucl.hadoop.mapreduce.rand; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; /** * @author luchunli * @description MapReduce自带的输入类都是基于HDFS的,如下示例代码不用从HDFS上面读取内容, * 而是在内存里面随机生成100个(0-1)float类型的小数,然后求这100个小数的最大值。 */ public class RandomDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new RandomDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); conf.set("lucl.random.nums", "100"); conf.set("mapreduce.job.maps", "2"); Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); job.setJarByClass(RandomDriver.class); job.setInputFormatClass(RandomInputFormat.class); job.setMapperClass(RandomMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(FloatWritable.class); job.setReducerClass(RandomReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); return job.waitForCompletion(true) ? 0 : 1; } }
[hadoop@nnode code]$ hadoop jar RandomMR.jar /201512020027 15/12/02 00:28:07 INFO client.RMProxy: Connecting to ResourceManager at nnode/ 生成的随机数的值如下: 0.020075738 0.700349 0.9617876 0.8286018 0.03357637 0.55033255 0.112645924 0.43312508 0.33184355 0.6960902 0.23912054 0.8523424 0.4133852 0.028242588 0.9031814 0.39397871 0.38278967 0.5842654 0.4569224 0.4008881 0.2230537 0.92889327 0.20127994 0.09574646 0.23173904 0.4365906 0.11567855 0.027944028 0.6965957 0.78311944 0.2365641 0.8575301 0.07472658 0.5219022 0.9409952 0.7122519 0.8722465 0.30288923 0.51773626 0.91211754 0.93172425 0.38484365 0.44844115 0.24589789 0.83361626 0.40983224 0.9444963 0.12061542 0.8446641 0.5303581 0.11295539 0.094003916 0.11822218 0.4997149 0.98296344 0.48746037 0.31420535 0.1151396 0.7904337 0.80005115 0.18344402 0.8171619 0.8749699 0.48023254 0.0044505 0.43879867 0.22367835 0.62924916 0.6998315 0.222148 0.7392884 0.4174865 0.4528237 0.70034826 0.3057149 0.29177833 0.22782367 0.8182611 0.46680295 0.4778521 0.6365823 0.43971914 0.27055055 0.26839674 0.5263245 0.8824649 0.51182485 0.20494783 0.7679403 0.31936407 0.13476872 0.47281688 0.3402111 0.28706527 0.038203478 0.7351879 0.6165404 0.41761196 0.5229257 0.7284225 ==================== 15/12/02 00:28:08 INFO mapreduce.JobSubmitter: number of splits:2 15/12/02 00:28:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448981819300_0014 15/12/02 00:28:09 INFO impl.YarnClientImpl: Submitted application application_1448981819300_0014 15/12/02 00:28:09 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448981819300_0014/ 15/12/02 00:28:09 INFO mapreduce.Job: Running job: job_1448981819300_0014 15/12/02 00:28:38 INFO mapreduce.Job: Job job_1448981819300_0014 running in uber mode : false 15/12/02 00:28:38 INFO mapreduce.Job: map 0% reduce 0% 15/12/02 00:29:13 INFO mapreduce.Job: map 100% reduce 0% 15/12/02 00:29:32 INFO mapreduce.Job: map 100% reduce 100% 15/12/02 00:29:32 INFO mapreduce.Job: Job job_1448981819300_0014 completed successfully 15/12/02 00:29:32 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=26 FILE: Number of bytes written=323256 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=520 HDFS: Number of bytes written=31 HDFS: Number of read operations=7 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=64430 Total time spent by all reduces in occupied slots (ms)=16195 Total time spent by all map tasks (ms)=64430 Total time spent by all reduce tasks (ms)=16195 Total vcore-seconds taken by all map tasks=64430 Total vcore-seconds taken by all reduce tasks=16195 Total megabyte-seconds taken by all map tasks=65976320 Total megabyte-seconds taken by all reduce tasks=16583680 Map-Reduce Framework Map input records=2 Map output records=2 Map output bytes=16 Map output materialized bytes=32 Input split bytes=520 Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=32 Reduce input records=2 Reduce output records=1 Spilled Records=4 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=356 CPU time spent (ms)=1940 Physical memory (bytes) snapshot=513851392 Virtual memory (bytes) snapshot=2541506560 Total committed heap usage (bytes)=257171456 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=31 [hadoop@nnode code]$
[hadoop@nnode code]$ hdfs dfs -ls /201512020027 Found 2 items -rw-r--r-- 2 hadoop hadoop 0 2015-12-02 00:29 /201512020027/_SUCCESS -rw-r--r-- 2 hadoop hadoop 31 2015-12-02 00:29 /201512020027/part-r-00000 [hadoop@nnode code]$ hdfs dfs -text /201512020027/part-r-00000 The max value is : 0.98296344 [hadoop@nnode code]$