MapReduce入门详解(二)

论坛 期权论坛 编程之家     
选择匿名的用户   2021-5-30 09:12   148   0

MapReduce实践攻略

超详细入门级-WordCount

问题描述:
统计一个文件中,各种单词出现的次数
思路分析:

  1. 在map阶段,对每行数据调用一次map方法,对读取到的每行数据按空格进行切割,将分割得到的每个单词作为key,value的值给定为1传递给reduce
  2. 在reduce阶段,从map接收到传递过来的key和value,key值相同的为同一组,对每一组只调用一次reduce方法,将每一组的value值累加即可得到该单词出现的次数,最后将该组的key作为key,累加的value作为value作为结果输出
 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

 

public class WordCountMR2 extends Configured implements Tool {

/**

* KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,

* 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable

* VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text

* KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text

* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable

*/

public static class WCMapper extends Mapper<LongWritable,Text, Text, IntWritable> {

/**

* map阶段的业务逻辑就写在自定义的map()方法中

* maptask会对每一行输入数据调用一次我们自定义的map()方法

* context是上下文引用对象,传递输出值

*/

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

Collections.list(new StringTokenizer(value.toString()," ")).stream().map(s -> ((String)s).trim())

.filter(s -> s.length() > 1).forEach(ExceptionConsumer.of(word -> context.write(new Text(word),new IntWritable(1))));

}

}

/**

* KEYIN, VALUEIN对应mapper输出的KEYOUT,VALUEOUT类型对应

* KEYOUT, VALUEOUT是自定义reduce逻辑处理结果的输出数据类型

* KEYOUT是单词

* VLAUEOUT是总次数

*/

public static class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

/**

* reduce阶段的业务逻辑就写在自定义的reduce()方法中

* reducetask会对所有相同的key调用一次reduce()方法

* context是上下文引用对象,传递输出值

*/

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

//map阶段的输出是reduce阶段的输入,样式如下

//<helle,1><hello,1><helle,1><hello,1><helle,1><hello,1>

//<tom,1><tom,1><tom,1>

//<good,1>

// int count = 0;

// for (IntWritable value : values){

// count += value.get();

// }

// context.write(key, new IntWritable(count));

IntWritable count = StreamSupport.stream(values.spliterator(), false).collect(Collectors.toSet()).stream()

.reduce((a, b) -> new IntWritable(a.get() + b.get())).get();

context.write(key,count);

}

}

@Override

public int run(String[] strings) throws Exception {

Configuration conf = getConf();

//创建job实例对象

Job job = Job.getInstance(conf,"test_fun_wordcount2");

//指定本程序的jar包所在的本地路径

job.setJarByClass(this.getClass());

//指定本业务job要使用的mapper/Reducer业务类

job.setMapperClass(WCMapper.class);

job.setReducerClass(WCReducer.class);

//指定mapper输出数据的kv类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//指定最终输出的数据的kv类型

//注:不是setReduceOutput,因为有的时候只需要用到map,直接输出map矟O XX]X\YX^[YXX\\ ^ ^ ^ KKH KO \YO XYYXYX\\K]\XI^ [Y\^^ HS[[\\Y^[KKH KO  ^[YH[Y\KKH KO [ NKYX\ H  K\ N^ ]J]^ K[YJNO O O \YO XX[[[H[H^[KKH KO Y][ N N N[[ X\W\P\\˙\ JNX\\\X\\\NX\]P\YX\\ \NX\][YP\^ \NYX\YX\N]P\^ \N][YP\^ \N[]X]\^[]X] \N]X]\^]X] \N^[]X] Y[]] ]]  ] JN^]X] ]] ]]  ]] JN:+b!c.)&O \][\\YX\\][\\N:+b!)&O [\]\YX\\\]\N]\]\][YJO NO XX]XXZ[[H\H^[KKH KO [\[]\TT K\NO O   O XO  9d# XN N9nm L9" 9NO c﹣IXNHYH^K N K\YXQMIN IPMIQNINMN QN PQPMQN PMLQP QM PINQPK^K N K\YXaizeN

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛