Job提交流程源码和切片源码详解

论坛 期权论坛 编程之家     
选择匿名的用户   2021-5-29 08:50   452   0

1. 进入Job提交方法

public boolean waitForCompletion(boolean verbose

) throws IOException, InterruptedException,

ClassNotFoundException {

// 判断Job的状态,如果Runing,代表Job正在运行,不会重复提交

if (state == JobState.DEFINE) {

submit();

}

// 执行完后,打印执行的信息

if (verbose) {

monitorAndPrintJob();

} else {

// get the completion poll interval from the client.

int completionPollIntervalMillis =

Job.getCompletionPollInterval(cluster.getConf());

while (!isComplete()) {

try {

Thread.sleep(completionPollIntervalMillis);

} catch (InterruptedException ie) {

}

}

}

return isSuccessful();

}

1.1 提交Job到Cluster

public void submit()

throws IOException, InterruptedException, ClassNotFoundException {

ensureState(JobState.DEFINE);

setUseNewAPI();

// 创建Cluster对象,包含两个关键属性: ①文件系统,负责读入数据到程序,写出数据,保存结果 ②运行Job的客户端,如果Job运行方式是Local,使用LocalJobRunner,如果Job运行方式是YARN,使用YarnRunner

connect();

final JobSubmitter submitter =

getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {

public JobStatus run() throws IOException, InterruptedException,

ClassNotFoundException {

return submitter.submitJobInternal(Job.this, cluster);

}

});

state = JobState.RUNNING;

LOG.info("The url to track the job: " + getTrackingURL());

}

1.2 创建Cluster

private synchronized void connect()

throws IOException, InterruptedException, ClassNotFoundException {

//根据用户的configuration,创建相应的Cluster,负责运行Job

if (cluster == null) {

cluster =

ugi.doAs(new PrivilegedExceptionAction<Cluster>() {

public Cluster run()

throws IOException, InterruptedException,

ClassNotFoundException {

return new Cluster(getConfiguration());

}

});

}

}

1.3 使用Submitter提交Job

JobStatus submitJobInternal(Job job, Cluster cluster)

throws ClassNotFoundException, InterruptedException, IOException {

// 验证输出目录是否合法和存在

checkSpecs(job);

Configuration conf = job.getConfiguration();

addMRFrameworkToDistributedCache(conf);

// 获取当前Job作业区域的路径

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

//configure the command line options correctly on the submitting dfs

InetAddress ip = InetAddress.getLocalHost();

if (ip != null) {

submitHostAddress = ip.getHostAddress();

submitHostName = ip.getHostName();

conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);

conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);

}

JobID jobId = submitClient.getNewJobID();

job.setJobID(jobId);

// 如果本地提交: 当前job的作业目录在eclipse所在的工作空间,所在盘符的/tmp

// YARN上提交,需要HDFS来找/tmp

Path submitJobDir = new Path(jobStagingArea, jobId.toString());

JobStatus status = null;

try {

conf.set(MRJobConfig.USER_NAME,

UserGroupInformation.getCurrentUser().getShortUserName());

conf.set("hadoop.http.filter.initializers",

"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");

conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());

LOG.debug("Configuring job " + jobId + " with " + submitJobDir

+ " as the submit dir");

// get delegation token for the dir

TokenCache.obtainTokensForNamenodes(job.getCredentials(),

new Path[] { submitJobDir }, conf);

populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers

if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {

KeyGenerator keyGen;

try {

keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);

keyGen.init(SHUFFLE_KEY_LENGTH);

} catch (NoSuchAlgorithmException e) {

throw new IOException("Error generating shuffle secret key", e);

}

SecretKey shuffleKey = keyGen.generateKey();

TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),

job.getCredentials());

}

if (CryptoUtils.isEncryptedSpillEnabled(conf)) {

conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);

LOG.warn("Max job attempts set to 1 since encrypted intermediate" +

"data spill is enabled");

}

copyAndConfigureFiles(job, submitJobDir);

Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// Create the splits for the job

LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

// 切片操作,产生split文件和splitinfo,是对切片和对切片的说明信息

// split记录了 当前输入目录中,所有文件,切了几片,每一片都是一个FileSplit对象

// splitinto对所有片信息的说明,记录了每一片,应该到哪个节点去读取数据

int maps = writeSplits(job, submitJobDir);

// 设置mapreduce.job.maps 为切片数

conf.setInt(MRJobConfig.NUM_MAPS, maps);

LOG.info("number of splits:" + maps);

// write "queue admins of the queue to which job is being submitted"

// to job file.

String queue = conf.get(MRJobConfig.QUEUE_NAME,

JobConf.DEFAULT_QUEUE_NAME);

AccessControlList acl = submitClient.getQueueAdmins(queue);

conf.set(toFullPropertyName(queue,

QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS

// as the tasks don't need this setting, actually they may break

// because of it if present as the referral will point to a

// different job.

TokenCache.cleanUpTokenReferral(conf);

if (conf.getBoolean(

MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,

MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {

// Add HDFS tracking ids

ArrayList<String> trackingIds = new ArrayList<String>();

for (Token<? extends TokenIdentifier> t :

job.getCredentials().getAllTokens()) {

trackingIds.add(t.decodeIdentifier().getTrackingId());

}

conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,

trackingIds.toArray(new String[trackingIds.size()]));

}

// Set reservation info if it exists

ReservationId reservationId = job.getReservationId();

if (reservationId != null) {

conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());

}

// Job所有的配置信息,写入到Job.xml中!

writeConf(conf, submitJobFile);

//

// Now, actually submit the job (using the submit name)

//

printTokens(jobId, job.getCredentials());

// 正式准备提交Job

status = submitClient.submitJob(

jobId, submitJobDir.toString(), job.getCredentials());

if (status != null) {

return status;

} else {

throw new IOException("Could not launch job");

}

} finally {

if (status == null) {

LOG.info("Cleaning up the staging area " + submitJobDir);

if (jtFs != null && submitJobDir != null)

jtFs.delete(submitJobDir, true);

}

}

}

1.4 提交Job

public org.apache.hadoop.mapreduce.JobStatus submitJob(

org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,

Credentials credentials) throws IOException {

// 根据之前的准备工作,重构Job

Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);

job.job.setCredentials(credentials);

return job.status;

}

1.5 创建LocalJobRunner可以执行的Job对象

public Job(JobID jobid, String jobSubmitDir) throws IOException {

……

// 将之前已经生成的Job运行的各种设置,重新赋值给LocalJobRunner$Job

// 开启一个分线程来运行Job

this.start();

}

1.6 Job的run()

@Override

public void run() {

JobID jobId = profile.getJobID();

// JobContext代表Job运行的上下文,可以获取Job中所有的配置信息

JobContext jContext = new JobContextImpl(job, jobId);

org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;

try {

outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);

} catch (Exception e) {

LOG.info("Failed to createOutputCommitter", e);

return;

}

try {

// 根据切片信息,创建TaskSplitMetaInfo数组,有几片,数组大小就是几

TaskSplitMetaInfo[] taskSplitMetaInfos =

SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

int numReduceTasks = job.getNumReduceTasks();

outputCommitter.setupJob(jContext);

status.setSetupProgress(1.0f);

// 指定保存所有MapTask输出目录的位置

Map<TaskAttemptID, MapOutputFile> mapOutputFiles =

Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());

// 创建运行的MapTask进程列表

List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(

taskSplitMetaInfos, jobId, mapOutputFiles);

initCounters(mapRunnables.size(), numReduceTasks);

// 创建一个线程池

ExecutorService mapService = createMapExecutor();

// 运行所有的MapTask , 需要查看MapTaskRunablerun()

runTasks(mapRunnables, mapService, "map");

try {

if (numReduceTasks > 0) {

List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(

jobId, mapOutputFiles);

ExecutorService reduceService = createReduceExecutor();

runTasks(reduceRunnables, reduceService, "reduce");

}

} finally {

for (MapOutputFile output : mapOutputFiles.values()) {

output.removeAll();

}

}

// delete the temporary directory in output directory

outputCommitter.commitJob(jContext);

status.setCleanupProgress(1.0f);

if (killed) {

this.status.setRunState(JobStatus.KILLED);

} else {

this.status.setRunState(JobStatus.SUCCEEDED);

}

JobEndNotifier.localRunnerNotification(job, status);

} catch (Throwable t) {

try {

outputCommitter.abortJob(jContext,

org.apache.hadoop.mapreduce.JobStatus.State.FAILED);

} catch (IOException ioe) {

LOG.info("Error cleaning up job:" + id);

}

status.setCleanupProgress(1.0f);

if (killed) {

this.status.setRunState(JobStatus.KILLED);

} else {

this.status.setRunState(JobStatus.FAILED);

}

LOG.warn(id, t);

JobEndNotifier.localRunnerNotification(job, status);

} finally {

try {

fs.delete(systemJobFile.getParent(), true); // delete submit dir

localFs.delete(localJobFile, true); // delete local copy

// Cleanup distributed cache

localDistributedCacheManager.close();

} catch (IOException e) {

LOG.warn("Error cleaning up "+id+": "+e);

}

}

}

2. 进入Map阶段

2.1 进入MapTaskRunable的run()

public void run() {

try {

// 生成当前Task任务的id

TaskAttemptID mapId = new TaskAttemptID(new TaskID(

jobId, TaskType.MAP, taskId), 0);

LOG.info("Starting task: " + mapId);

mapIds.add(mapId);

MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,

info.getSplitIndex(), 1);

map.setUser(UserGroupInformation.getCurrentUser().

getShortUserName());

setupChildMapredLocalDirs(map, localConf);

// 创建当前MapTask 输出的文件对象

MapOutputFile mapOutput = new MROutputFiles();

mapOutput.setConf(localConf);

mapOutputFiles.put(mapId, mapOutput);

map.setJobFile(localJobFile.toString());

localConf.setUser(map.getUser());

map.localizeConfiguration(localConf);

map.setConf(localConf);

try {

map_tasks.getAndIncrement();

myMetrics.launchMap(mapId);

// 进入MapTaskrun()

map.run(localConf, Job.this);

myMetrics.completeMap(mapId);

} finally {

map_tasks.getAndDecrement();

}

LOG.info("Finishing task: " + mapId);

} catch (Throwable e) {

this.storedException = e;

}

}

}

2.2 MapTask的run()

@Override

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, ClassNotFoundException, InterruptedException {

this.umbilical = umbilical;

// 判断是否需要reduce阶段

// Map阶段,可以分为两个阶段:

map: 调用Mapper的map()方法,对输入的key-value进行处理

// sort : 当map()处理完,context.wirte(),将key-value保存到文件中!

// 在保存到文件之前,会将所有的key-value进行排序,会经过排序的阶段

if (isMapTask()) {

// If there are no reducers then there won't be any sort. Hence the map

// phase will govern the entire attempt's progress.

if (conf.getNumReduceTasks() == 0) {

mapPhase = getProgress().addPhase("map", 1.0f);

} else {

// If there are reducers then the entire attempt's progress will be

// split between the map phase (67%) and the sort phase (33%).

mapPhase = getProgress().addPhase("map", 0.667f);

sortPhase = getProgress().addPhase("sort", 0.333f);

}

}

TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewMapper();

initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask

if (jobCleanup) {

runJobCleanupTask(umbilical, reporter);

return;

}

if (jobSetup) {

runJobSetupTask(umbilical, reporter);

return;

}

if (taskCleanup) {

runTaskCleanupTask(umbilical, reporter);

return;

}

if (useNewApi) {

runNewMapper(job, splitMetaInfo, umbilical, reporter);

} else {

runOldMapper(job, splitMetaInfo, umbilical, reporter);

}

done(umbilical, reporter);

}

2.3 运行Mapper

@SuppressWarnings("unchecked")

private <INKEY,INVALUE,OUTKEY,OUTVALUE>

void runNewMapper(final JobConf job,

final TaskSplitIndex splitIndex,

final TaskUmbilicalProtocol umbilical,

TaskReporter reporter

) throws IOException, ClassNotFoundException,

InterruptedException {

// MapTask 的上下文对象

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,

getTaskID(),

reporter);

// 实例化Mapper对象

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =

(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

// 获取输入格式

org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =

(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)

ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

//重建当前的切片

org.apache.hadoop.mapreduce.InputSplit split = null;

split = getSplitDetails(new Path(splitIndex.getSplitLocation()),

splitIndex.getStartOffset());

LOG.info("Processing split: " + split);

// 负责初始化RecordReader

// 在NewTrackingRecordReader构造方法中,为真正读取记录的RecordReader进行赋值

org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =

new NewTrackingRecordReader<INKEY,INVALUE>

(split, inputFormat, reporter, taskContext);

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

// output负责写出MapTask产生的key-value

org.apache.hadoop.mapreduce.RecordWriter output = null;

// get an output object

if (job.getNumReduceTasks() == 0) {

output =

// 如果没用Reduce阶段,使用一个直接输出的记录收集器

new NewDirectOutputCollector(taskContext, job, umbilical, reporter);

} else {

// 默认使用的NewOutputCollector收集Map写出的key-value

output = new NewOutputCollector(taskContext, job, umbilical, reporter);

}

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>

mapContext =

new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),

input, output,

committer,

reporter, split);

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context

mapperContext =

new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(

mapContext);

try {

input.initialize(split, mapperContext);

mapper.run(mapperContext);

mapPhase.complete();

setPhase(TaskStatus.Phase.SORT);

statusUpdate(umbilical);

input.close();

input = null;

output.close(mapperContext);

output = null;

} finally {

closeQuietly(input);

closeQuietly(output, mapperContext);

}

}

2.4 进入Mapper的run()

public void run(Context context) throws IOException, InterruptedException {

// setUp先执行

setup(context);

try {

// 对每一对输入的key-value,调用map进行处理

while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

} finally {

// 最终执行cleanup()

cleanup(context);

}

}

3. 进入Reduce阶段

3.1 ReduceTask的run()方法

@SuppressWarnings("unchecked")

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, InterruptedException, ClassNotFoundException {

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

// ReduceTask 大致分为三个阶段

// copy: 将当前ReduceTask处理的分区文件,从所有的MapTask中拷贝过来

// sort: 将多个分区文件,合并为一个总的文件,保证合并后总的文件也是有序的

// merge and sort

// reduce : 将合并后的文件,交给reducer处理

if (isMapOrReduce()) {

copyPhase = getProgress().addPhase("copy");

sortPhase = getProgress().addPhase("sort");

reducePhase = getProgress().addPhase("reduce");

}

// start thread that will handle communication with parent

TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewReducer();

initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask

if (jobCleanup) {

runJobCleanupTask(umbilical, reporter);

return;

}

if (jobSetup) {

runJobSetupTask(umbilical, reporter);

return;

}

if (taskCleanup) {

runTaskCleanupTask(umbilical, reporter);

return;

}

// Initialize the codec

codec = initCodec();

RawKeyValueIterator rIter = null;

ShuffleConsumerPlugin shuffleConsumerPlugin = null;

Class combinerClass = conf.getCombinerClass();

CombineOutputCollector combineCollector =

(null != combinerClass) ?

new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

Class<? extends ShuffleConsumerPlugin> clazz =

job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);

LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

ShuffleConsumerPlugin.Context shuffleContext =

new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,

super.lDirAlloc, reporter, codec,

combinerClass, combineCollector,

spilledRecordsCounter, reduceCombineInputCounter,

shuffledMapsCounter,

reduceShuffleBytes, failedShuffleCounter,

mergedMapOutputsCounter,

taskStatus, copyPhase, sortPhase, this,

mapOutputFile, localMapFiles);

shuffleConsumerPlugin.init(shuffleContext);

rIter = shuffleConsumerPlugin.run();

// free up the data structures

mapOutputFilesOnDisk.clear();

sortPhase.complete(); // sort is complete

setPhase(TaskStatus.Phase.REDUCE);

statusUpdate(umbilical);

Class keyClass = job.getMapOutputKeyClass();

Class valueClass = job.getMapOutputValueClass();

RawComparator comparator = job.getOutputValueGroupingComparator();

if (useNewApi) {

// 进入reducer处理

runNewReducer(job, umbilical, reporter, rIter, comparator,

keyClass, valueClass);

} else {

runOldReducer(job, umbilical, reporter, rIter, comparator,

keyClass, valueClass);

}

shuffleConsumerPlugin.close();

done(umbilical, reporter);

}

3.2 runNewReducer()

@SuppressWarnings("unchecked")

private <INKEY,INVALUE,OUTKEY,OUTVALUE>

void runNewReducer(JobConf job,

final TaskUmbilicalProtocol umbilical,

final TaskReporter reporter,

RawKeyValueIterator rIter,

RawComparator<INKEY> comparator,

Class<INKEY> keyClass,

Class<INVALUE> valueClass

) throws IOException,InterruptedException,

ClassNotFoundException {

// wrap value iterator to report progress.

final RawKeyValueIterator rawIter = rIter;

rIter = new RawKeyValueIterator() {

public void close() throws IOException {

rawIter.close();

}

public DataInputBuffer getKey() throws IOException {

return rawIter.getKey();

}

public Progress getProgress() {

return rawIter.getProgress();

}

public DataInputBuffer getValue() throws IOException {

return rawIter.getValue();

}

public boolean next() throws IOException {

boolean ret = rawIter.next();

reporter.setProgress(rawIter.getProgress().getProgress());

return ret;

}

};

// make a task context so we can get the classes

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,

getTaskID(), reporter);

// make a reducer

org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =

(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)

ReflectionUtils.newInstance(taskContext.getReducerClass(), job);

org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =

new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);

job.setBoolean("mapred.skip.on", isSkipping());

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

org.apache.hadoop.mapreduce.Reducer.Context

reducerContext = createReduceContext(reducer, job, getTaskID(),

rIter, reduceInputKeyCounter,

reduceInputValueCounter,

trackedRW,

committer,

reporter, comparator, keyClass,

valueClass);

try {

// 类同Mapperrun()

reducer.run(reducerContext);

} finally {

trackedRW.close(reducerContext);

}

}

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP