public class WordCountMR2 extends Configured implements Tool { /** * KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long, * 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text * VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable */ public static class WCMapper extends Mapper<LongWritable,Text, Text, IntWritable> { /** * map阶段的业务逻辑就写在自定义的map()方法中 * maptask会对每一行输入数据调用一次我们自定义的map()方法 * context是上下文引用对象,传递输出值 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Collections.list(new StringTokenizer(value.toString()," ")).stream().map(s -> ((String)s).trim()) .filter(s -> s.length() > 1).forEach(ExceptionConsumer.of(word -> context.write(new Text(word),new IntWritable(1)))); } } /** * KEYIN, VALUEIN对应mapper输出的KEYOUT,VALUEOUT类型对应 * KEYOUT, VALUEOUT是自定义reduce逻辑处理结果的输出数据类型 * KEYOUT是单词 * VLAUEOUT是总次数 */ public static class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> { /** * reduce阶段的业务逻辑就写在自定义的reduce()方法中 * reducetask会对所有相同的key调用一次reduce()方法 * context是上下文引用对象,传递输出值 */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //map阶段的输出是reduce阶段的输入,样式如下 //<helle,1><hello,1><helle,1><hello,1><helle,1><hello,1> //<tom,1><tom,1><tom,1> //<good,1> // int count = 0; // for (IntWritable value : values){ // count += value.get(); // } // context.write(key, new IntWritable(count)); IntWritable count = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toSet()).stream() .reduce((a, b) -> new IntWritable(a.get() + b.get())).get(); context.write(key,count); } } @Override public int run(String[] strings) throws Exception { Configuration conf = getConf(); //创建job实例对象 Job job = Job.getInstance(conf,"test_fun_wordcount2"); //指定本程序的jar包所在的本地路径 job.setJarByClass(this.getClass()); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最终输出的数据的kv类型 //注:不是setReduceOutput,因为有的时候只需要用到map,直接输出map矟OXX]X\YX^[YXX\\^^^ KKHKO\YOXYYXYX\\K]\XI^ [Y\^^
HS[[\\Y^[KKHKO
^[YH[Y\KKHKO[ NKYX\
H K\
N^]J]^
K[YJNOOO\YOXX[[[H[H^[KKHKOY][ N
N N[[ X\W\P\\˙\
JNX\\\X\\\NX\]P\YX\\\NX\][YP\^\NYX\YX\N]P\^\N][YP\^\N[]X]\^[]X]\N]X]\^]X]\N^[]X]Y[]]
]]
] JN^]X]]]
]]
]] JN:+b!c.)&O\][\\YX\\][\\N:+b!)&O[\]\YX\\\]\N]\]\][YJONOXX]XXZ[[H\H^[KKHKO[\[]\TT
K\NOOOXO 9d# XNN9nmL9"9NOc﹣IXNHYH^KNK\YXQMIN
IPMIQNINMN QN PQPMQN PMLQP QM PINQPK^KNK\YXaizeN |