JobTracker在这做的是类似路由的作用,JobClient将任务信息提交到JobTracker,JobTracker将消息信息路由分发给TaskTracker进行消息处理。
在上一篇博客LTS原理--JobClient提交任务过程(二)中我们已经介绍了JobClient的任务提交,接下来我们看看在JobTracker中对接收到的任务信息做了什么处理操作。
一、示例
1、JobTracker服务注册
(1)zk服务注册地址
(2)对外服务端口30005
(3)数据相关配置
<bean id="jobTracker" class="com.github.ltsopensource.spring.JobTrackerFactoryBean" init-method="start">
<property name="clusterName" value="test_cluster"/>
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
<property name="listenPort" value="30005"/>
<property name="configs">
<props>
<prop key="job.logger">mysql</prop>
<prop key="job.queue">mysql</prop>
<prop key="jdbc.url">jdbc:mysql://127.0.0.1:3306/lts</prop>
<prop key="jdbc.username">root</prop>
<prop key="jdbc.password">root</prop>
</props>
</property>
</bean>
(2)Main函数
public class Main {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("/lts-jobtracker.xml");
}
}
在JobTrackerFactoryBean进行初始化时会将服务注册到zk,对外提供30005服务端口及连接数据库操作。
二、任务接收
JobTracker提供了一个RemotingDispatcher请求接收分发器,当JobClient提交任务时,会由JobSubmitProcessor进行处理,将接收到的消息添加到一个消息队列中。
由于LTS封装了Netty、Mina和LTS通信框架,最最终消息处理还是调用RemotingDispatcher的processRequest方法,接下来我们看看做了什么处理。
在doBiz中进行最终业务进行处理。
@Override
public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
//真的不同的消息进行不同处理
// 心跳
if (request.getCode() == JobProtos.RequestCode.HEART_BEAT.code()) {
offerHandler(channel, request);
return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.HEART_BEAT_SUCCESS.code(), "");
}
//限流请求
if (reqLimitEnable) {
return doBizWithReqLimit(channel, request);
} else {
//处理具体的业务
return doBiz(channel, request);
}
}
doBiz中会根据不同的RequestCode选择不同的处理器RemotingProcessor进行处理,当JobClient提交任务时,此时RemotingProcessor的实现类JobSubmitProcessor对请求进行处理。
private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
// 其他的请求code
RequestCode code = RequestCode.valueOf(request.getCode());
//根据不同的请求选择不同的请求处理器
RemotingProcessor processor = processors.get(code);
if (processor == null) {
return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
}
offerHandler(channel, request);
return processor.processRequest(channel, request);
}
在JobSubmitProcessor的processRequest中就是将任务信息添加到消息队列中,并且最终消息会进行入库处理,然后创建返回Response。
@Override
public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
JobSubmitRequest jobSubmitRequest = request.getBody();
JobSubmitResponse jobSubmitResponse = appContext.getCommandBodyWrapper().wrapper(new JobSubmitResponse());
RemotingCommand response;
try {
//将消息添加到消息队列中
appContext.getJobReceiver().receive(jobSubmitRequest);
//返回任务提交成功
response = RemotingCommand.createResponseCommand(
JobProtos.ResponseCode.JOB_RECEIVE_SUCCESS.code(), "job submit success!", jobSubmitResponse);
} catch (JobReceiveException e) {
LOGGER.error("Receive job failed , jobs = " + jobSubmitRequest.getJobs(), e);
jobSubmitResponse.setSuccess(false);
jobSubmitResponse.setMsg(e.getMessage());
jobSubmitResponse.setFailedJobs(e.getJobs());
response = RemotingCommand.createResponseCommand(
JobProtos.ResponseCode.JOB_RECEIVE_FAILED.code(), e.getMessage(), jobSubmitResponse);
}
return response;
}
在JobReceive的receive方法中会进行消息接收操作,最终将消息入库
public void receive(JobSubmitRequest request) throws JobReceiveException {
List<Job> jobs = request.getJobs();
if (CollectionUtils.isEmpty(jobs)) {
return;
}
JobReceiveException exception = null;
for (Job job : jobs) {
try {
//消息添加到队列
addToQueue(job, request);
} catch (Exception e) {
if (exception == null) {
exception = new JobReceiveException(e);
}
exception.addJob(job);
}
}
if (exception != null) {
throw exception;
}
}
在addToQueue中将消息添加并记录日志
private JobPo addToQueue(Job job, JobSubmitRequest request) {
JobPo jobPo = null;
boolean success = false;
BizLogCode code = null;
try {
jobPo = JobDomainConverter.convert(job);
if (jobPo == null) {
LOGGER.warn("Job can not be null。{}", job);
return null;
}
if (StringUtils.isEmpty(jobPo.getSubmitNodeGroup())) {
jobPo.setSubmitNodeGroup(request.getNodeGroup());
}
// 设置 jobId
jobPo.setJobId(JobUtils.generateJobId());
// 添加任务
addJob(job, jobPo);
success = true;
code = BizLogCode.SUCCESS;
} catch (DupEntryException e) {
// 已经存在
if (job.isReplaceOnExist()) {
Assert.notNull(jobPo);
success = replaceOnExist(job, jobPo);
code = success ? BizLogCode.DUP_REPLACE : BizLogCode.DUP_FAILED;
} else {
code = BizLogCode.DUP_IGNORE;
LOGGER.info("Job already exist And ignore. nodeGroup={}, {}", request.getNodeGroup(), job);
}
} finally {
if (success) {
stat.incReceiveJobNum();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Receive Job success. {}", job);
}
}
}
// 记录日志
jobBizLog(jobPo, code);
return jobPo;
}
根据不同的消息类型添加消息到数据库中
/**
* 添加任务
*/
private void addJob(Job job, JobPo jobPo) throws DupEntryException {
if (job.isCron()) {
addCronJob(jobPo);
} else if (job.isRepeatable()) {
addRepeatJob(jobPo);
} else {
addTriggerTimeJob(jobPo);
}
}
三、任务发送
TaskTracker通过拉去的方式从JobTracker获取任务,接下来我们看看JobTracker发送任务是做了什么操作。
在RemotingDispatcher中doBiz根据code类型来选择处理器RemotingProcessor
private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
// 其他的请求code
RequestCode code = RequestCode.valueOf(request.getCode());
//根据code选择处理器
RemotingProcessor processor = processors.get(code);
if (processor == null) {
return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
}
offerHandler(channel, request);
return processor.processRequest(channel, request);
}
在JobPullProcessor中processRequest会根据请求内容,选择将任务分发到对应到TaskTracker中。
@Override
public RemotingCommand processRequest(final Channel ctx, final RemotingCommand request) throws RemotingCommandException {
//获取请求体
JobPullRequest requestBody = request.getBody();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , availableThreads:{}", requestBody.getNodeGroup(), requestBody.getIdentity(), requestBody.getAvailableThreads());
}
//添加到处理job中
jobPusher.push(requestBody);
return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_PULL_SUCCESS.code(), "");
}
在JobPusher的push方法中会新起线程处理TaskTracker发送过来的任务请求。
public void push(final JobPullRequest request) {
this.executorService.submit(new Runnable() {
@Override
public void run() {
try {
//新起线程处理请求
push0(request);
} catch (Exception e) {
LOGGER.error("Job push failed!", e);
}
}
});
}
在push0中就是根据TaskTracker的可用工作线程数,来推送对应的数量的任务
private void push0(final JobPullRequest request) {
//获取分组信息
String nodeGroup = request.getNodeGroup();
String identity = request.getIdentity();
// 更新TaskTracker的可用线程数
appContext.getTaskTrackerManager().updateTaskTrackerAvailableThreads(nodeGroup,
identity, request.getAvailableThreads(), request.getTimestamp());
//获取TaskTracker节点信息
final TaskTrackerNode taskTrackerNode = appContext.getTaskTrackerManager().
getTaskTrackerNode(nodeGroup, identity);
if (taskTrackerNode == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , didn't have node.", nodeGroup, identity);
}
return;
}
int availableThread = taskTrackerNode.getAvailableThread().get();
if (availableThread <= 0) {
return;
}
AtomicBoolean pushingFlag = getPushingFlag(taskTrackerNode);
if (pushingFlag.compareAndSet(false, true)) {
try {
final int batchSize = jobPushBatchSize;
int it = availableThread % batchSize == 0 ? availableThread / batchSize : availableThread / batchSize + 1;
final CountDownLatch latch = new CountDownLatch(it);
for (int i = 1; i <= it; i++) {
int size = batchSize;
if (i == it) {
size = availableThread - batchSize * (it - 1);
}
final int finalSize = size;
//创建多个线程去推送任务
pushExecutorService.execute(new Runnable() {
@Override
public void run() {
try {
// 推送任务
send(remotingServer, finalSize, taskTrackerNode);
} catch (Throwable t) {
LOGGER.error("Error on Push Job to {}", taskTrackerNode, t);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
DotLogUtils.dot("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , pushing finished. batchTimes:{}, size:{}", nodeGroup, identity, it, availableThread);
} finally {
pushingFlag.compareAndSet(true, false);
}
}
}
在send方法中,会查询未处理的任务,将任务信息推送到TaskTracker中
private JobPushResult send(final RemotingServerDelegate remotingServer, int size, final TaskTrackerNode taskTrackerNode) {
//任务分组
final String nodeGroup = taskTrackerNode.getNodeGroup();
final String identity = taskTrackerNode.getIdentity();
//推送任务
JobSender.SendResult sendResult = appContext.getJobSender().send(nodeGroup, identity, size, new JobSender.SendInvoker() {
@Override
public JobSender.SendResult invoke(final List<JobPo> jobPos) {
// 发送给TaskTracker执行
JobPushRequest body = appContext.getCommandBodyWrapper().wrapper(new JobPushRequest());
body.setJobMetaList(JobDomainConverter.convert(jobPos));
RemotingCommand commandRequest = RemotingCommand.createRequestCommand(JobProtos.RequestCode.PUSH_JOB.code(), body);
// 是否分发推送任务成功
final Holder<Boolean> pushSuccess = new Holder<Boolean>(false);
final CountDownLatch latch = new CountDownLatch(1);
try {
//建立连接发送消息
remotingServer.invokeAsync(taskTrackerNode.getChannel().getChannel(), commandRequest, new AsyncCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
try {
RemotingCommand responseCommand = responseFuture.getResponseCommand();
if (responseCommand == null) {
LOGGER.warn("Job push failed! response command is null!");
return;
}
if (responseCommand.getCode() == JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job push success! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobList=" + JSON.toJSONString(jobPos));
}
pushSuccess.set(true);
stat.incPushJobNum(jobPos.size());
} else if (responseCommand.getCode() == JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code()) {
JobPushResponse jobPushResponse = responseCommand.getBody();
if (jobPushResponse != null && CollectionUtils.isNotEmpty(jobPushResponse.getFailedJobIds())) {
// 修复任务
for (String jobId : jobPushResponse.getFailedJobIds()) {
for (JobPo jobPo : jobPos) {
if (jobId.equals(jobPo.getJobId())) {
resumeJob(jobPo);
break;
}
}
}
stat.incPushJobNum(jobPos.size() - jobPushResponse.getFailedJobIds().size());
} else {
stat.incPushJobNum(jobPos.size());
}
pushSuccess.set(true);
}
} finally {
latch.countDown();
}
}
});
} catch (RemotingSendException e) {
LOGGER.error("Remoting send error, jobPos={}", JSON.toJSONObject(jobPos), e);
return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
}
try {
latch.await(Constants.LATCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RequestTimeoutException(e);
}
if (!pushSuccess.get()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobs=" + JSON.toJSONObject(jobPos));
}
for (JobPo jobPo : jobPos) {
resumeJob(jobPo);
}
return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
}
return new JobSender.SendResult(true, JobPushResult.SUCCESS);
}
});
return (JobPushResult) sendResult.getReturnValue();
}
在JobSender中调用send方法,完成任务推送处理。
public SendResult send(String taskTrackerNodeGroup, String taskTrackerIdentity, int size, SendInvoker invoker) {
//根据任务分钟获取任务
List<JobPo> jobPos = fetchJob(taskTrackerNodeGroup, taskTrackerIdentity, size);
if (jobPos.size() == 0) {
return new SendResult(false, JobPushResult.NO_JOB);
}
//调用方法推送任务
SendResult sendResult = invoker.invoke(jobPos);
//记录日志
if (sendResult.isSuccess()) {
List<JobLogPo> jobLogPos = new ArrayList<JobLogPo>(jobPos.size());
for (JobPo jobPo : jobPos) {
// 记录日志
JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
jobLogPo.setSuccess(true);
jobLogPo.setLogType(LogType.SENT);
jobLogPo.setLogTime(SystemClock.now());
jobLogPo.setLevel(Level.INFO);
jobLogPos.add(jobLogPo);
}
appContext.getJobLogger().log(jobLogPos);
}
return sendResult;
}
总结:
JobTracker主要两方面功能
(1)接收JobClient的任务,将任务进行持久化操作
(2)接收TaskTracker的通知,当存在JobClient的任务时将任务信息推送到TaskTracker中。 |