Java线程之CompletionService

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 17:27   2169   0
当我们提交了批量任务,并且需要获取其结果时,可以使用以下方式:
package com.zzj.concurrent.batchtask;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class BatchSubmitTest {
 
 public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService executor = Executors.newCachedThreadPool();
  
  List<Future<String>> futures = new ArrayList<Future<String>>();
  for (int i = 0; i < 10; i++) {
   Future<String> future = executor.submit(new Task((10 - i) * 1000));
   futures.add(future);
  }
  
  for (Future<String> future : futures) {
   System.out.println(future.get());
  }
  
  executor.shutdown();
 }
}
或者:
package com.zzj.concurrent.batchtask;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class InvokeAllTest {
 public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService executor = Executors.newCachedThreadPool();
  
  List<Callable<String>> tasks = new ArrayList<Callable<String>>();
  for (int i = 0; i < 10; i++) {
   tasks.add(new Task((10 - i) * 1000));
  }
  
  List<Future<String>> futures = executor.invokeAll(tasks);
  for (Future<String> future : futures) {
   System.out.println(future.get());
  }
  
  executor.shutdown();
 }
}

任务Task:

package com.zzj.concurrent.batchtask;

import java.util.concurrent.Callable;

public class Task implements Callable<String>{
 private final long sleep;
 
 public Task(long sleep) {
  this.sleep = sleep;
 }

 @Override
 public String call() throws Exception {
  Thread.sleep(sleep);
  return String.valueOf(sleep);
 }

}
输出结果:

10000
9000
8000
7000
6000
5000
4000
3000
2000
1000
总共创建了10个任务,第一个任务的执行时间为10秒,第二个为9秒,以此类推,最后一个任务执行时间为1秒。

从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住。

如果系统需要设计成每个任务完成后就能根据其结果继续做后面的事,这样对于处于list后面但是先完成的任务就会增加了额外的等待时间。

那么有没有办法让每个任务完成后就能根据其结果继续做后面的事?这时就用到了CompletionService。示例如下:

package com.zzj.concurrent.batchtask;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CompletionServiceTest {
 
 public static void main(String[] args) throws InterruptedException, ExecutionException {
  ExecutorService executor = Executors.newCachedThreadPool();
  CompletionService<String> completion = new ExecutorCompletionService<String>(executor);
  for (int i = 0; i < 10; i++) {
   completion.submit(new Task((10 - i) * 1000));
  }
  for (int i = 0; i < 10; i++) {
   Future<String> future = completion.take();
   System.out.println(future.get());
  }
  executor.shutdown();
 }
}
输出结果:
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。

所以,先完成的必定先被取出,这样就减少了不必要的等待时间。






分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP