|
通常情况下,使用执行器来执行并发任务时,将Runnable或Callable任务发送给执行器,Future对象来控制任务。此外,还会碰到如下情形,需要在一个对象里发送任务给执行器,然后在另一个对象里处理结果。对于这种情况java提供了CompletionService类。
CompletionService类有一个方法用来发送任务给执行器,还有一个任务为下一个已经执行结束的任务获取Future对象。从内部实行机制来看,CompletionService类使用了Executor对象来执行任务。这个行为的优势是共享CompletionService对象,并发送任务到执行器,然后其他的对象可以处理任务的结果。第二个方法有一个不足之处,它只能为已执行结束的任务获取Future对象,因此这个Future对象只能被用来获取任务的结果。
CompletionService:里面有一个阻塞队列,当callable执行完毕时,会调用done()方法,就会把future添加进去。
例子:
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class SCompletionService implements Runnable {
private CompletionService<String> completionService;
public SCompletionService(CompletionService<String> completionService) {
super();
this.completionService = completionService;
}
@Override
public void run() {
while (true) {
try {
//等到获取task,如果获取不到一直阻塞在这里
Future<String> take = completionService.take();
System.out.println(take.get());
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<>(newCachedThreadPool);
newCachedThreadPool.execute(new SCompletionService(completionService));
for (int i = 1; i < 6; i++) {
completionService.submit(new SCCallable(i));
}
newCachedThreadPool.shutdown();
}
}
class SCCallable implements Callable<String> {
private int i;
public SCCallable(int i) {
super();
this.i = i;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(i);
return "这是call:" + i;
}
}
输出结果: 这是call:1 这是call:2 这是call:3 这是call:4 这是call:5
分析:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
/**
* FutureTask extension to enqueue upon completion
*/
//这个类继承了FutureTask 并实现了done方法
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
//当执行完毕时就会把task 添加到completionQueue这个阻塞队列中
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is {@code null}
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* {@code Queue.add} operations for completed tasks cause
* them not to be retrievable.
* @throws NullPointerException if executor or completionQueue are {@code null}
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
//获取队列的值
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
//获取队列的值
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
|