简介
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
MapReduce框架的结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
- MapReduceApplicationMaster:负责整个程序的过程调度及状态协调
- MapTask:负责map阶段的整个数据处理流程
- ReduceTask:负责reduce阶段的整个数据处理流程
MapReduce的运行流程
- 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的MapTask实例数量,然后向集群申请机器启动相应数量的MapTask进程。
- Maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
- 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对
- 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存中
- 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
- MRAppMaster监控到所有MapTask进程任务完成之后,会根据客户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围(数据分区)
- ReduceTask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台MapTask运行所在机器上获取到若干个MapTask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
深入解析MapReduce原理
MapTask并行度决定机制
- MapTask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度
- 一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个MapTask并行实例处理。
- 这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成。
FileInputFormat切片机制
-
FileInputFormat默认的切片机制
- 简单地按照文件的内容长度进行切片
- 切片大小默认等于block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
- 实例
1 2 3 4 5 6 7 8 |
假如有两个待处理的数据文件: a.txt 300MB b.txt 10MB 经过FileInputFormat的切片机制运算后: a.txt.split1 0~128MB a.txt.split2 128~256MB a.txt.split3 256~300MB b.txt.split1 0~10MB |
-
FileInputFormat切片机制的具体流程
- 获取到数据存储目录
- 开始遍历该目录下的每一个文件
- 遍历文件的过程
- 获取文件的大小
- 计算切片的大小
- 开始切片,形成第1个切片,第2个。。。
- 将形成的切片信息记录到切片规划文件中
ReduceTask并行度决定机制
- ReduceTask的并行度同样影响整个job的执行并发度和执行效率,与maptask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:job.setNumReduceTasks(4); //默认值是1,手动设置为4
- ReduceTask的数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask
- 尽量不要运行太多的ReduceTask。对大多数job来说,最好ReduceTask的个数最多和集群中的reduce持平,或者更小。小集群要特别注意这点。
MapReduce的shuffle机制
- 简介
- mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle
- shuffle: 洗牌、发牌
|