FutureTask解析

论坛 期权论坛 脚本     
匿名技术用户   2021-1-3 02:36   21   0
package java.util.concurrent;
import java.util.concurrent.locks.*;

/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the <tt>get</tt>
* method will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled.
*
FutureTask是一个可取消的异步计算任务,并提供了基于Future接口实现的开始和取消
计算任务,查看计算任务状态和在计算任务结束后获取结果的方法。计算任务的结果,只有
在计算任务完成时,才能取得,如果计算任务还没完成,将会阻塞。只要计算任务完成,
计算任务就不能被取消或重新启动。
* <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
* {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
* implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
* submitted to an {@link Executor} for execution.
*
FutureTask可用于包装Callable和Runnable对象。由于FutureTask实现了Runnable接口,
所有可以被调到执行器,执行。
* <p>In addition to serving as a standalone class, this class provides
* <tt>protected</tt> functionality that may be useful when creating
* customized task classes.
*
当我们创建任务线程类时为单独的类(独立任务),FutureTask的protect功能方法也许有用。

* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's <tt>get</tt> method
*/
public class FutureTask<V> implements RunnableFuture<V> {
/** Synchronization control for FutureTask 用于控制FutureTask的同步器*/
private final Sync sync;
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Callable</tt>.
*
创建一个FutureTask,在执行时,将会执行参数Callable
* @param callable the callable task
* @throws NullPointerException if callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
* given result on successful completion.
*
直接通过Executors执行任务,并将结果保存到result中
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if runnable is null
*/
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
}

FutureTask内部关联着一个同步器Sync,主要用于控制cancel,get等操作。
我们来看一下内部同步器Sync:
  /**
* Synchronization control for FutureTask. Note that this must be
* a non-static inner class in order to invoke the protected
* <tt>done</tt> method. For clarity, all inner class support
* methods are same as outer, prefixed with "inner".
*
控制FutureTask的同步器,由于需要调用protected的done方法,所以类必须定义为
非静态内部类。为了清晰起见,所有内部类支持的方法,与外部类FutureTask一样,只不过
添加了inner最为前缀。
* Uses AQS sync state to represent run status
*/
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;

/** State value representing that task is ready to run 准备就绪*/
private static final int READY = 0;
/** State value representing that task is running 正在运行*/
private static final int RUNNING = 1;
/** State value representing that task ran 运行完*/
private static final int RAN = 2;
/** State value representing that task was cancelled 取消*/
private static final int CANCELLED = 4;

/** The underlying callable 执行线程callable*/
private final Callable<V> callable;
/** The result to return from get() 结果*/
private V result;
/** The exception to throw from get() get方法抛出的异常*/
private Throwable exception;

/**
* The thread running task. When nulled after set/cancel, this
* indicates that the results are accessible. Must be
* volatile, to ensure visibility upon completion.
线程用于执行任务。在set/cancel操作后为null,预示着任务结果可用,
变量必须volatile,以保证在任务执行完时,结果的可见性。
*/
private volatile Thread runner;
//构造任务同步器
Sync(Callable<V> callable) {
this.callable = callable;
}
//返回任务执行的状态,是运行完还是取消
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}
//是否执行结束,如果任务运行完或取消,且运行任务线程为null,即任务结束
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
/**
* Implements AQS base acquire to succeed if ran or cancelled
任务运行完或取消,则尝试获取共享锁成功。
*/
protected int tryAcquireShared(int ignore) {
return innerIsDone() ? 1 : -1;
}
/**
* Implements AQS base release to always signal after setting
* final done status by nulling runner thread.
在通过设置运行任务线程为null,设置任务线程状态为结束时,释放共享锁
*/
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
//判断任务状态是否为取消
boolean innerIsCancelled() {
return getState() == CANCELLED;
}
//获取任务执行结果
V innerGet() throws InterruptedException, ExecutionException {
//这个我们在AQS篇章中有讲,这里不再说
//如果任务线程执行结束,如果状态为取消,则抛出CancellationException
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
//否则任务线程运行完,返回结果
return result;
}
//超时获取任务执行结果,这个与get方法不同是,超时等待任务线程执行结束
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
}

Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,
正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因
,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。
回到FutureTask
 public boolean isCancelled() {
return sync.innerIsCancelled();
}
public boolean isDone() {
return sync.innerIsDone();
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}

从上面可以看出FutureTask的isCancelled,isDone,get和超时get方法是直接委托给
内部同步器Sync的相应方法。
再看其他方法先看取消
//FutureTask
   public boolean cancel(boolean mayInterruptIfRunning) {
//委托给内部同步器
return sync.innerCancel(mayInterruptIfRunning);
}

//Sync
 boolean innerCancel(boolean mayInterruptIfRunning) {
//自旋设置任务线程运行状态为CANCELLED
for (;;) {
int s = getState();
if (ranOrCancelled(s))
//如果任务已经执行结束,则返回false,不可取消
return false;
//AQS设置任务线程运行状态为CANCELLED
if (compareAndSetState(s, CANCELLED))
break;
}
//如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
//释放锁
releaseShared(0);
//做一些任务执行结束工作
done();
return true;
}


//FutureTask

  /**
* Protected method invoked when this task transitions to state
* <tt>isDone</tt> (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
无论任务线程取消还是正常运行结束,只要线程的isDone状态为true,则调用
此方法。默认实现不做任务事情,留给子类扩展。子类可以重写此方法,用于
在执行完成时,调用回调接口或者执行记录工作。同时可以在 此方法中确认
任务线程是否被取消。
*/
protected void done() { }

从上来看取消操作,首先自旋设置任务线程运行状态为CANCELLED,
如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,
释放锁,做一些任务执行结束工作(默认为空)。

再来看run
    // The following (duplicated) doc comment can be removed once
//
// 6270645: Javadoc comments should be inherited from most derived
// superinterface or superclass
// is fixed.
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
public void run() {
//委托给内部同步器
sync.innerRun();
}

//Sync
 void innerRun() {
//如果任务线程处理就绪状态,则设置为运行状态,否则返回
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
//执行callable
result = callable.call();
} catch (Throwable ex) {
//设置执行异常
setException(ex);
return;
}
//设置结果
set(result);
} else {
releaseShared(0); // cancel
}
}

分别来看设置执行异常和设置结果

//设置结果
set(result);
//FutureTask
    /**
* Sets the result of this Future to the given value unless
* this future has already been set or has been cancelled.
* This method is invoked internally by the <tt>run</tt> method
* upon successful completion of the computation.
如果结果已经被设值或任务线程被取消,则设置失败。此方在run方法成功
完成任务时,调用。
* @param v the value
*/
protected void set(V v) {
sync.innerSet(v);
}

//Sync
void innerSet(V v) {
for (;;) {
int s = getState();
//如果任务运行完,则返回
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
//设置任务运行状态为RAN,并设值
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}


//设置执行异常
setException(ex);
//FutureTask

    /**
* Causes this future to report an <tt>ExecutionException</tt>
* with the given throwable as its cause, unless this Future has
* already been set or has been cancelled.
* This method is invoked internally by the <tt>run</tt> method
* upon failure of the computation.
* @param t the cause of failure
设置执行异常
*/
protected void setException(Throwable t) {
sync.innerSetException(t);
}

//Sync 这一步与innerSet相似,不在说
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
releaseShared(0);
done();
return;
}
}
}

再看runAndReset
 /**
* Executes the computation without setting its result, and then
* resets this Future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
此方的功能如果任务线程正在运行,并且没有设置结果,可以重新设置任务线程为
就绪状态,如任务线程运行异常或取消,则重置失败。这个用于任务需要多次执行的场景。
* @return true if successfully run and reset
*/
protected boolean runAndReset() {
//委托给内部同步器
return sync.innerRunAndReset();
}

//Sync
 boolean innerRunAndReset() {
//如果任务线程处于从READY切换到RUNNING失败,则返回false,即任务线程不处于就绪状态
if (!compareAndSetState(READY, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
//如果任务线程正在运行,调用callable
callable.call(); // don't set result
runner = null;//重置任务线程为null
return compareAndSetState(RUNNING, READY);//重置任务线程从RUNNING到READY
} catch (Throwable ex) {
setException(ex);
return false;
}
}


总结:
[color=blue]FutureTask内部关联一个同步器Sync,Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。取消操作,首先自旋设置任务线程运行状态为CANCELLED,如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,释放锁,做一些任务执行结束工作(默认为空)。FutureTask的相关操作主要通过Sync来完成。[/color]
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the <tt>run</tt> method causes completion of the <tt>Future</tt>
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's <tt>get</tt> method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP