dz̸Ϊʲô°¢Àï°Í°ÍÒª½ûÓÃExecutors´´½¨Ï̳߳Ø

¤… niminba   ¤b 2021-5-26 12:25   ¤z 1547   ¤k 0

¿´°¢Àï°Í°Í¿ª·¢ÊֲᲢ·¢±à³ÌÕâ¿éÓÐÒ»Ìõ£ºÏ̳߳ز»ÔÊÐíʹÓÃExecutorsÈ¥´´½¨£¬¶øÊÇͨ¹ýThreadPoolExecutorµÄ·½Ê½£¬Í¨¹ýÔ´Âë·ÖÎö½ûÓõÄÔ­Òò

дÔÚÇ°Ãæ

Ê×ÏȸÐл´ó¼ÒÔÚ¸ÇÂ¥µÄ¼ä϶ÔĶÁ±¾ÆªÎÄÕ£¬Í¨¹ýÔĶÁ±¾ÆªÎÄÕÂÄ㽫Á˽⵽£º

  • Ï̳߳ص͍Òå
  • Executors´´½¨Ï̳߳صļ¸ÖÖ·½Ê½
  • ThreadPoolExecutor¶ÔÏó
  • Ï̳߳ØÖ´ÐÐÈÎÎñÂß¼­ºÍÏ̳߳زÎÊýµÄ¹ØÏµ
  • Executors´´½¨·µ»ØThreadPoolExecutor¶ÔÏó
  • OOMÒì³£²âÊÔ
  • ÈçºÎ¶¨ÒåÏ̳߳زÎÊý

Èç¹ûÖ»ÏëÖªµÀÔ­Òò¿ÉÒÔÖ±½ÓÀ­µ½×ܽáÄÇ

Ï̳߳ص͍Òå

¹ÜÀíÒ»×鹤×÷Ï̡߳£Í¨¹ýÏ̳߳ظ´ÓÃÏß³ÌÓÐÒÔϼ¸µãÓŵ㣺

  • ¼õÉÙ×ÊÔ´´´½¨ => ¼õÉÙÄڴ濪Ïú£¬´´½¨Ïß³ÌÕ¼ÓÃÄÚ´æ
  • ½µµÍϵͳ¿ªÏú => ´´½¨Ïß³ÌÐèҪʱ¼ä£¬»áÑÓ³Ù´¦ÀíµÄÇëÇó
  • Ìá¸ßÎȶ¨Îȶ¨ÐÔ => ±ÜÃâÎÞÏÞ´´½¨Ïß³ÌÒýÆðµÄOutOfMemoryError¡¾¼ò³ÆOOM¡¿

Executors´´½¨Ï̳߳صķ½Ê½

¸ù¾Ý·µ»ØµÄ¶ÔÏóÀàÐÍ´´½¨Ï̳߳ؿÉÒÔ·ÖΪÈýÀࣺ

  • ´´½¨·µ»ØThreadPoolExecutor¶ÔÏó
  • ´´½¨·µ»ØScheduleThreadPoolExecutor¶ÔÏó
  • ´´½¨·µ»ØForkJoinPool¶ÔÏó

±¾ÎÄÖ»ÌÖÂÛ´´½¨·µ»Ø ThreadPoolExecutor ¶ÔÏó

ThreadPoolExecutor¶ÔÏó

ÔÚ½éÉÜ Executors ´´½¨Ï̳߳ط½·¨Ç°ÏȽéÉÜһϠThreadPoolExecutor £¬ÒòΪÕâЩ´´½¨Ï̳߳صľ²Ì¬·½·¨¶¼ÊÇ·µ»Ø ThreadPoolExecutor ¶ÔÏ󣬺ÍÎÒÃÇÊÖ¶¯´´½¨ ThreadPoolExecutor ¶ÔÏóµÄÇø±ð¾ÍÊÇÎÒÃDz»ÐèÒª×Ô¼º´«¹¹Ô캯ÊýµÄ²ÎÊý¡£ ThreadPoolExecutor µÄ¹¹Ô캯Êý¹²ÓÐËĸö£¬µ«×îÖÕµ÷ÓõͼÊÇͬһ¸ö£º

public ThreadPoolExecutor(int corePoolSize,
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
       BlockingQueue<Runnable> workQueue,
       ThreadFactory threadFactory,
       RejectedExecutionHandler handler)

¹¹Ô캯Êý²ÎÊý˵Ã÷£º

  • corePoolSize => Ï̳߳غËÐÄÏß³ÌÊýÁ¿
  • maximumPoolSize => Ï̳߳Ø×î´óÊýÁ¿
  • keepAliveTime => ¿ÕÏÐÏ̴߳æ»îʱ¼ä
  • unit => ʱ¼äµ¥Î»
  • workQueue => Ï̳߳ØËùʹÓõĻº³å¶ÓÁÐ
  • threadFactory => Ï̳߳ش´½¨Ïß³ÌʹÓõŤ³§
  • handler => Ï̳߳ضԾܾøÈÎÎñµÄ´¦Àí²ßÂÔ

Ï̳߳ØÖ´ÐÐÈÎÎñÂß¼­ºÍÏ̳߳زÎÊýµÄ¹ØÏµ

Ö´ÐÐÂß¼­ËµÃ÷£º

  • ÅжϺËÐÄÏß³ÌÊýÊÇ·ñÒÑÂú£¬ºËÐÄÏß³ÌÊý´óСºÍcorePoolSize²ÎÊýÓйأ¬Î´ÂúÔò´´½¨Ïß³ÌÖ´ÐÐÈÎÎñ
  • ÈôºËÐÄÏ̳߳ØÒÑÂú£¬Åж϶ÓÁÐÊÇ·ñÂú£¬¶ÓÁÐÊÇ·ñÂúºÍworkQueue²ÎÊýÓйأ¬ÈôδÂúÔò¼ÓÈë¶ÓÁÐÖÐ
  • Èô¶ÓÁÐÒÑÂú£¬ÅжÏÏ̳߳ØÊÇ·ñÒÑÂú£¬Ï̳߳ØÊÇ·ñÒÑÂúºÍmaximumPoolSize²ÎÊýÓйأ¬ÈôδÂú´´½¨Ïß³ÌÖ´ÐÐÈÎÎñ
  • ÈôÏ̳߳ØÒÑÂú£¬Ôò²ÉÓþܾø²ßÂÔ´¦ÀíÎÞ·¨Ö´Ö´ÐеÄÈÎÎñ£¬¾Ü¾ø²ßÂÔºÍhandler²ÎÊýÓйØ

Executors´´½¨·µ»ØThreadPoolExecutor¶ÔÏó

Executors ´´½¨·µ»ØThreadPoolExecutor¶ÔÏóµÄ·½·¨¹²ÓÐÈýÖÖ£º

  • Executors#newCachedThreadPool => ´´½¨¿É»º´æµÄÏ̳߳Ø
  • Executors#newSingleThreadExecutor => ´´½¨µ¥Ï̵߳ÄÏ̳߳Ø
  • Executors#newFixedThreadPool => ´´½¨¹Ì¶¨³¤¶ÈµÄÏ̳߳Ø

Executors#newCachedThreadPool·½·¨

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
         60L, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>());
}

CachedThreadPool ÊÇÒ»¸ö¸ù¾ÝÐèÒª´´½¨ÐÂÏ̵߳ÄÏ̳߳Ø

  • corePoolSize => 0£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª0
  • maximumPoolSize => Integer.MAX_VALUE£¬Ï̳߳Ø×î´óÊýÁ¿ÎªInteger.MAX_VALUE£¬¿ÉÒÔÈÏΪ¿ÉÒÔÎÞÏÞ´´½¨Ïß³Ì
  • keepAliveTime => 60L
  • unit => Ãë
  • workQueue => SynchronousQueue

µ±Ò»¸öÈÎÎñÌύʱ£¬ corePoolSize Ϊ0²»´´½¨ºËÐÄỊ̈߳¬ SynchronousQueue ÊÇÒ»¸ö²»´æ´¢ÔªËصĶÓÁУ¬¿ÉÒÔÀí½âΪ¶ÓÀïÓÀÔ¶ÊÇÂúµÄ£¬Òò´Ë×îÖջᴴ½¨·ÇºËÐÄÏß³ÌÀ´Ö´ÐÐÈÎÎñ¡£¶ÔÓڷǺËÐÄÏ߳̿ÕÏÐ60sʱ½«±»»ØÊÕ¡£ ÒòΪ Integer.MAX_VALUE ·Ç³£´ó£¬¿ÉÒÔÈÏΪÊÇ¿ÉÒÔÎÞÏÞ´´½¨Ï̵߳ģ¬ÔÚ×ÊÔ´ÓÐÏÞµÄÇé¿öÏÂÈÝÒ×ÒýÆðOOMÒì³£

Executors#newSingleThreadExecutor·½·¨

public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
  (new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutor Êǵ¥Ïß³ÌÏ̳߳أ¬Ö»ÓÐÒ»¸öºËÐÄÏß³Ì

  • corePoolSize => 1£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª1
  • maximumPoolSize => 1£¬Ï̳߳Ø×î´óÊýÁ¿Îª1£¬¼´×î¶àÖ»¿ÉÒÔ´´½¨Ò»¸öỊ̈߳¬Î¨Ò»µÄÏ߳̾ÍÊǺËÐÄÏß³Ì
  • keepAliveTime => 0L
  • unit => ºÁÃë
  • workQueue => LinkedBlockingQueue

µ±Ò»¸öÈÎÎñÌύʱ£¬Ê×ÏȻᴴ½¨Ò»¸öºËÐÄÏß³ÌÀ´Ö´ÐÐÈÎÎñ£¬Èç¹û³¬¹ýºËÐÄÏ̵߳ÄÊýÁ¿£¬½«»á·ÅÈë¶ÓÁÐÖУ¬ ÒòΪ LinkedBlockingQueue Êdz¤¶ÈΪ Integer.MAX_VALUE µÄ¶ÓÁУ¬¿ÉÒÔÈÏΪÊÇÎÞ½ç¶ÓÁУ¬Òò´ËÍù¶ÓÁÐÖпÉÒÔ²åÈëÎÞÏÞ¶àµÄÈÎÎñ£¬ÔÚ×ÊÔ´ÓÐÏÞµÄʱºòÈÝÒ×ÒýÆð OOM Òì³£ £¬Í¬Ê±ÒòΪÎÞ½ç¶ÓÁУ¬ maximumPoolSize ºÍ keepAliveTime ²ÎÊý½«ÎÞЧ£¬Ñ¹¸ù¾Í²»»á´´½¨·ÇºËÐÄÏß³Ì

Executors#newFixedThreadPool·½·¨

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool Êǹ̶¨ºËÐÄÏ̵߳ÄÏ̳߳أ¬¹Ì¶¨ºËÐÄÏß³ÌÊýÓÉÓû§´«Èë

corePoolSize => 1£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª1 maximumPoolSize => 1£¬Ö»¿ÉÒÔ´´½¨Ò»¸ö·ÇºËÐÄÏß³Ì keepAliveTime => 0L unit => Ãë workQueue => LinkedBlockingQueue ËüºÍ SingleThreadExecutor ÀàËÆ£¬Î¨Ò»µÄÇø±ð¾ÍÊǺËÐÄÏß³ÌÊý²»Í¬£¬²¢ÇÒÓÉÓÚ Ê¹ÓõÄÊÇ LinkedBlockingQueue £¬ÔÚ×ÊÔ´ÓÐÏÞµÄʱºòÈÝÒ×ÒýÆð OOM Òì³£

×ܽ᣺

  • corePoolSize => nThreads£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª1
  • maximumPoolSize => nThreads£¬Ï̳߳Ø×î´óÊýÁ¿ÎªnThreads£¬¼´×î¶àÖ»¿ÉÒÔ´´½¨nThreads¸öÏß³Ì
  • keepAliveTime => 0L
  • unit => ºÁÃë
  • workQueue => LinkedBlockingQueue
  • ËüºÍSingleThreadExecutorÀàËÆ£¬Î¨Ò»µÄÇø±ð¾ÍÊǺËÐÄÏß³ÌÊý²»Í¬£¬²¢ÇÒÓÉÓÚʹÓõÄÊÇLinkedBlockingQueue£¬ÔÚ×ÊÔ´ÓÐÏÞµÄʱºòÈÝÒ×ÒýÆðOOMÒì³£

Õâ¾ÍÊÇΪʲô½ûֹʹÓà Executors È¥´´½¨Ï̳߳أ¬¶øÊÇÍÆ¼ö×Ô¼ºÈ¥´´½¨ ThreadPoolExecutor µÄÔ­Òò

OOMÒì³£²âÊÔ

ÀíÂÛÉÏ»á³öÏÖ OOM Òì³££¬±ØÐë²âÊÔÒ»²¨Ñé֤֮ǰµÄ˵·¨£º ²âÊÔÀࣺTaskTest.java

public class TaskTest {
 public static void main(String[] args) {
  ExecutorService es = Executors.newCachedThreadPool();
  int i = 0;
  while (true) {
   es.submit(new Task(i++));
  }
 }
}

ʹÓà Executors ´´½¨µÄ CachedThreadPool £¬ÍùÏ̳߳ØÖÐÎÞÏÞÌí¼ÓÏß³Ì ÔÚÆô¶¯²âÊÔÀà֮ǰÏȽ« JVM ÄÚ´æµ÷ÕûСһµã£¬²»È»ºÜÈÝÒ×½«µçÄÔÅܳöÎÊÌ⡾±ðÎÊÎÒΪʲô֪µÀ£¬ÊÇÌúº©º©Ìðû´íÁË£¡£¡£¡¡¿£¬ÔÚ idea À Run -> Edit Configurations

JVM

²ÎÊý˵Ã÷£º

  • -Xms10M => Java HeapÄÚ´æ³õʼ»¯Öµ
  • -Xmx10M => Java HeapÄÚ´æ×î´óÖµ

ÔËÐнá¹û£º

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
Disconnected from the target VM, address: '127.0.0.1:60416', transport: 'socket'

´´½¨µ½3w¶à¸öÏ̵߳Äʱºò¿ªÊ¼±¨ OOM ´íÎó

ÁíÍâÁ½¸öÏ̳߳ؾͲ»×ö²âÊÔÁË£¬²âÊÔ·½·¨Ò»Ö£¬Ö»ÊÇ´´½¨µÄÏ̳߳ز»Ò»Ñù

ÈçºÎ¶¨ÒåÏ̳߳زÎÊý

  •  CPUÃܼ¯ÐÍ => Ï̳߳صĴóÐ¡ÍÆ¼öΪ CPU ÊýÁ¿ + 1£¬ CPU ÊýÁ¿¿ÉÒÔ¸ù¾Ý Runtime.availableProcessors ·½·¨»ñÈ¡
  • IOÃܼ¯ÐÍ => CPU ÊýÁ¿ * CPU ÀûÓÃÂÊ * (1 + Ï̵߳ȴýʱ¼ä/Ïß³ÌCPUʱ¼ä)
  • »ìºÏÐÍ => ½«ÈÎÎñ·ÖΪ CPU Ãܼ¯ÐÍºÍ IO Ãܼ¯ÐÍ£¬È»ºó·Ö±ðʹÓò»Í¬µÄÏ̳߳ØÈ¥´¦Àí£¬´Ó¶øÊ¹Ã¿¸öÏ̳߳ؿÉÒÔ¸ù¾Ý¸÷×ԵŤ×÷¸ºÔØÀ´µ÷Õû
  • ×èÈû¶ÓÁÐ => ÍÆ¼öʹÓÃÓнç¶ÓÁУ¬Óнç¶ÓÁÐÓÐÖúÓÚ±ÜÃâ×ÊÔ´ºÄ¾¡µÄÇé¿ö·¢Éú
  • ¾Ü¾ø²ßÂÔ => ĬÈϲÉÓõÄÊÇ AbortPolicy ¾Ü¾ø²ßÂÔ£¬Ö±½ÓÔÚ³ÌÐòÖÐÅ׳ö RejectedExecutionException Òì³£¡¾ÒòΪÊÇÔËÐÐʱÒì³££¬²»Ç¿ÖÆ catch ¡¿£¬ÕâÖÖ´¦Àí·½Ê½²»¹»ÓÅÑÅ¡£´¦Àí¾Ü¾ø²ßÂÔÓÐÒÔϼ¸ÖֱȽÏÍÆ¼ö£º
    • ÔÚ³ÌÐòÖв¶»ñ RejectedExecutionException Òì³££¬ÔÚ²¶»ñÒì³£ÖжÔÈÎÎñ½øÐд¦Àí¡£Õë¶ÔĬÈϾܾø²ßÂÔ
    • ʹÓà CallerRunsPolicy ¾Ü¾ø²ßÂÔ£¬¸Ã²ßÂԻὫÈÎÎñ½»¸øµ÷ÓÃexecuteµÄÏß³ÌÖ´ÐС¾Ò»°ãΪÖ÷Ï̡߳¿£¬´ËʱÖ÷Ï߳̽«ÔÚÒ»¶Îʱ¼äÄÚ²»ÄÜÌá½»ÈκÎÈÎÎñ£¬´Ó¶øÊ¹¹¤×÷Ï̴߳¦ÀíÕýÔÚÖ´ÐеÄÈÎÎñ¡£´ËʱÌá½»µÄÏ߳̽«±»±£´æÔÚ TCP ¶ÓÁÐÖУ¬TCP¶ÓÁÐÂú½«»áÓ°Ïì¿Í»§¶Ë£¬ÕâÊÇÒ»ÖÖÆ½»ºµÄÐÔÄܽµµÍ
    • ×Ô¶¨Òå¾Ü¾ø²ßÂÔ£¬Ö»ÐèҪʵÏÖ RejectedExecutionHandler ½Ó¿Ú¼´¿É
    • Èç¹ûÈÎÎñ²»ÊÇÌØ±ðÖØÒª£¬Ê¹Óà DiscardPolicy ºÍ DiscardOldestPolicy ¾Ü¾ø²ßÂÔ½«ÈÎÎñ¶ªÆúÒ²ÊÇ¿ÉÒÔµÄ

Èç¹ûʹÓÃExecutorsµÄ¾²Ì¬·½·¨´´½¨ ThreadPoolExecutor ¶ÔÏ󣬿ÉÒÔͨ¹ýʹÓà Semaphore ¶ÔÈÎÎñµÄÖ´ÐнøÐÐÏÞÁ÷Ò²¿ÉÒÔ±ÜÃâ³öÏÖ OOM Òì³£

ÓÉÓÚÏ̳߳زÎÊý¶¨Òå¾­Ñé½ÏÉÙ£¬¶¼ÊÇÀíÂÛ֪ʶ£¬»¶Ó­Óо­ÑéµÄ´óÀв¹³ä

µ½´ËÕâÆª¹ØÓÚdz̸Ϊʲô°¢Àï°Í°ÍÒª½ûÓÃExecutors´´½¨Ï̳߳صÄÎÄÕ¾ͽéÉܵ½ÕâÁË,¸ü¶àÏà¹Ø°¢Àï°Í°Í½ûÓÃExecutors´´½¨Ï̳߳ØÄÚÈÝÇëËÑË÷ÉçÇøÒÔǰµÄÎÄÕ»ò¼ÌÐøä¯ÀÀÏÂÃæµÄÏà¹ØÎÄÕÂÏ£Íû´ó¼ÒÒÔºó¶à¶àÖ§³ÖÉçÇø£¡

·ÖÏíµ½ :
0 ÈËÊÕ²Ø
ÄúÐèÒªµÇ¼ºó²Å¿ÉÒÔ»ØÌû µÇ¼ | Á¢¼´×¢²á

±¾°æ»ý·Ö¹æÔò

»ý·Ö:1060120
Ìû×Ó:212021
¾«»ª:0
ÆÚȨÂÛ̳ ÆÚȨÂÛ̳
·¢²¼
ÄÚÈÝ

ÏÂÔØÆÚȨÂÛ̳ÊÖ»úAPP