java线程中CompletionService

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 17:27   2125   0

通常情况下,使用执行器来执行并发任务时,将Runnable或Callable任务发送给执行器,Future对象来控制任务。此外,还会碰到如下情形,需要在一个对象里发送任务给执行器,然后在另一个对象里处理结果。对于这种情况java提供了CompletionService类。

CompletionService类有一个方法用来发送任务给执行器,还有一个任务为下一个已经执行结束的任务获取Future对象。从内部实行机制来看,CompletionService类使用了Executor对象来执行任务。这个行为的优势是共享CompletionService对象,并发送任务到执行器,然后其他的对象可以处理任务的结果。第二个方法有一个不足之处,它只能为已执行结束的任务获取Future对象,因此这个Future对象只能被用来获取任务的结果。

CompletionService:里面有一个阻塞队列,当callable执行完毕时,会调用done()方法,就会把future添加进去。

例子:

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SCompletionService implements Runnable {

    private CompletionService<String> completionService;

    public SCompletionService(CompletionService<String> completionService) {
        super();
        this.completionService = completionService;
    }

    @Override
    public void run() {
        while (true) {
            try {
                //等到获取task,如果获取不到一直阻塞在这里
                Future<String> take = completionService.take();
                System.out.println(take.get());
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CompletionService<String> completionService = new ExecutorCompletionService<>(newCachedThreadPool);
        newCachedThreadPool.execute(new SCompletionService(completionService));
        for (int i = 1; i < 6; i++) {
            completionService.submit(new SCCallable(i));
        }
        newCachedThreadPool.shutdown();
    }

}

class SCCallable implements Callable<String> {

    private int i;

    public SCCallable(int i) {
        super();
        this.i = i;
    }

    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(i);
        return "这是call:" + i;
    }

}

输出结果:
这是call:1
这是call:2
这是call:3
这是call:4
这是call:5


分析:

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion
     */
   //这个类继承了FutureTask 并实现了done方法
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
       //当执行完毕时就会把task 添加到completionQueue这个阻塞队列中
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    //获取队列的值
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    //获取队列的值
    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP