Map输入切片和记录的过程

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 21:03   3663   0

MapReduce在执行map()函数之前,还做了大量的工作,例如数据的切片,将切片生成键值对传给map()函数等等,在执行map()之前做了很多的事情,今天就记录一下输入数据的切片和记录。(tips:由于也是刚刚学,看了权威指南,分享一下心得,有什么问题希望大家指正)

1、简单介绍一下切片,记录以及map()方法之间的联系

一个输入的切片(split)就是一个由单个map操作来处理的输入块。每一个map()操作只处理一个输入分片。每个分片被划分为若干记录,每一个记录就是一个键值对,map()一个接一个的处理记录(后面用一个TextInputFormat输入格式来演示一下)。

2、介绍一下InputSplit接口和InputFormat

InputSplit接口:

输入分片在java中表示一个接口。(idea中使用ctrl+n来全局搜索)

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public abstract class InputSplit {
    public InputSplit() {
    }
    public abstract long getLength() throws IOException, InterruptedException;
    public abstract String[] getLocations() throws IOException, InterruptedException;
}
InputSplit包含一个以字节为单位的长度和一组存储位置(即一组主机名称)。注意分片不包含数据本身而是指向数据的引用。存储的位置供MapReduce系统使用以便将map任务尽量放在数据分片的旁边,也就是就近原则,避免网络的大量的传输数据造成网络压力过大。

InputFormat:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public abstract class InputFormat<K, V> {
    public InputFormat() {
    }
    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

3、MapReduce中切片生成记录到运行map()方法的流程

运行作业的客户端在通过调用InputFormat中getSplits()方法计算切片,然后将他们发送到application master,application master使用其存储的位置信息来调度map任务从而在集群上处理这些分片数据。map任务就把这个切片的数据传给InputFormat中的createRecordReader()方法来获取这个方法的RecordReader。RecordReader就是记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,然后再传递给map()函数。查看Mapper的run()方法可以看到这些情况:

    public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);
        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }
    }

运行setup()方法后,在重复的调用Context上的nextKeyValue()(委托给RecordReader的同名方法)为mapper产生键值对,通过Context,键值对从RecordReader中被索引出并且传递给map()方法。当reader读到stream的结尾时,nextKeyValue()方法返回false,map运行其cleanup()方法,然后结束。上面你的过程我想用一个流程图画出来,但是画了好几次,都不知道怎么画出来好,以后理解深刻了再更新吧。

4、对文本输入的偏移量的理解

在学习mr的过程中,对输入的偏移量一直都不是很了解,看了书之后,对LongWritable类型有了一点点认识,所以就记录下来。

TextInputFormat是默认的InputFormat。每一条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何的行终止符(换行符和回车符),它被打包成一个Text对象。所以,包含如下文本的文件被切分为包含四条记录的一个切片:

on the top of the Crumpetty Tree

The Quangle Wangle sat,

But his face you could not see,

On account of his Beaver Hat.

每一条记录表示一条键值对:

(0,on the top of the Crumpetty Tree) //到tree的最后一个e是32字节,为此下面key是33

(33,The Quangle Wangle sat,)

(57,But his face you could not see,)

(89,On account of his Beaver Hat.)

贴一下测试中Mapper中华的代码以及运行的结果:

  //v表示一行文本
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();
        //对文本文件用空格来切数据   保存为字符串的数组
        String [] arr =  value.toString().split(" ");
        //将key打印出来
        System.out.println(key+":"+value.toString()+":"+value.getLength());
        for (String s : arr){
            keyOut.set(s);
            valueOut.set(1);
            context.write(keyOut,valueOut);
        }
    }

运行结果:


为什么和上面提到的偏移量不一样?!!


分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP