本文共 3118 字,大约阅读时间需要 10 分钟。
combinner是mapreduce程序中为了减小网络传输量而设置的一个组件,其父类其实就是reducer,combinner在每一个maptask后面进行数据的汇总,从而减少在map之后reduce之前的shuffle机制一系列的数据读写的io操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NpXpolg1-1603713044018)(https://s1.ax1x.com/2020/10/26/BuxnI0.png)]
注意能够应用的前提不要影响到业务逻辑,combinner只是一种对mapreduce程序的优化手段,适用于对数据进行汇总的mr程序,不适用于对数据进行求平均的mr程序。
创建一个combinner类继承Reducer,并在驱动类中将mr程序与其关联。
若reduce与combinner进行的操作一致,那么直接设置reducer为combinner类也无妨。
combinner&reducer
package com.wordcount.mr;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class wordcountReducer extends Reducer{ //Text k = new Text(); IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value:values) { sum +=value.get(); } v.set(sum); context.write(key,v); }}
mapper
package com.wordcount.mr;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class wordcountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); for (String fied:fields) { k.set(fied); context.write(k,v); } }}
driver
package com.wordcount.mr;//import javax.security.auth.login.AppConfigurationEntry;//import javax.security.auth.login.Configuration;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class wordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{ "D:/mapreduceinput/input2","D:/mapreduceoutput/output4"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setJarByClass(wordcountDriver.class); job.setMapperClass(wordcountMapper.class); job.setReducerClass(wordcountReducer.class); job.setCombinerClass(wordcountReducer.class);//注意这里直接将reducer设置为了combinner job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1); }}
转载地址:http://ticki.baihongyu.com/