Hadoop中如何分区

小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

成都创新互联是一家专业提供陆良企业网站建设,专注与成都做网站、成都网站建设、H5响应式网站、小程序制作等业务。10年已为陆良众多企业、政府机构等服务。创新互联专业网站制作公司优惠进行中。

package partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class KpiApp {
	public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";
	public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();
		existsFile(conf);
		Job job = new Job(conf, KpiApp.class.getName());
		//打成Jar在Linux运行
		job.setJarByClass(KpiApp.class);
		
		//1.1
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2
		job.setMapperClass(MyMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//1.3 自定义分区
		job.setPartitionerClass(KpiPartition.class);
		job.setNumReduceTasks(2);
		
		//1.4 排序分组
		//1.5 聚合
		
		//2.1
		
		//2.2
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//2.3
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.waitForCompletion(true);
	}
	private static void existsFile(Configuration conf) throws IOException,
			URISyntaxException {
		FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);
		if(fs.exists(new Path(OUTPUT_PATH))){
			fs.delete(new Path(OUTPUT_PATH), true);
		}
	}
	static class MyMapper extends Mapper{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String string = value.toString();
			String[] split = string.split("\t");
			String phone = split[1];
			Text key2 = new Text();
			key2.set(phone);
			
			KpiWritable v2= new KpiWritable();
			v2.set(split[6],split[7],split[8],split[9]);
			context.write(key2, v2);
		}
		
	}
	static class MyReducer extends Reducer{

		@Override
		protected void reduce(Text key2, Iterable values,Context context)
				throws IOException, InterruptedException {
				long upPackNum = 0L;
				long downPackNum = 0L;
				long upPayLoad = 0L;
				long downPayLoad = 0L;
				for(KpiWritable writable : values){
					upPackNum += writable.upPackNum;
					downPackNum += writable.downPackNum;
					upPayLoad += writable.upPayLoad;
					downPayLoad += writable.downPayLoad;
				}
				KpiWritable value3 = new KpiWritable();
				value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));
				context.write(key2, value3);
		}
	}
}
class KpiWritable implements Writable{
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(this.upPackNum);
		out.writeLong(this.downPackNum);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
	}

	public void set(String string, String string2, String string3,
			String string4) {
		this.upPackNum = Long.parseLong(string);
		this.downPackNum = Long.parseLong(string2);
		this.upPayLoad = Long.parseLong(string3);
		this.downPayLoad = Long.parseLong(string4);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public String toString() {
		return  upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
	}
}
class KpiPartition extends Partitioner{

	@Override
	public int getPartition(Text key, KpiWritable value, int numPartitions) {
		String string = key.toString();
		return string.length()==11?0:1;
	}
}

  Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。

  HashPartitioner是MapReduce的默认Partitioner。

以上是“Hadoop中如何分区”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!


文章标题:Hadoop中如何分区
网页URL:http://pwwzsj.com/article/gpcgcs.html