¿´°¢Àï°Í°Í¿ª·¢ÊֲᲢ·¢±à³ÌÕâ¿éÓÐÒ»Ìõ£ºÏ̳߳ز»ÔÊÐíʹÓÃExecutorsÈ¥´´½¨£¬¶øÊÇͨ¹ýThreadPoolExecutorµÄ·½Ê½£¬Í¨¹ýÔ´Âë·ÖÎö½ûÓõÄÔÒò
дÔÚÇ°Ãæ
Ê×ÏȸÐл´ó¼ÒÔÚ¸ÇÂ¥µÄ¼ä϶ÔĶÁ±¾ÆªÎÄÕ£¬Í¨¹ýÔĶÁ±¾ÆªÎÄÕÂÄ㽫Á˽⵽£º
Ï̳߳ص͍Òå
Executors´´½¨Ï̳߳صļ¸ÖÖ·½Ê½
ThreadPoolExecutor¶ÔÏó
Ï̳߳ØÖ´ÐÐÈÎÎñÂß¼ºÍÏ̳߳زÎÊýµÄ¹ØÏµ
Executors´´½¨·µ»ØThreadPoolExecutor¶ÔÏó
OOMÒì³£²âÊÔ
ÈçºÎ¶¨ÒåÏ̳߳زÎÊý
Èç¹ûÖ»ÏëÖªµÀÔÒò¿ÉÒÔÖ±½ÓÀµ½×ܽáÄÇ
Ï̳߳ص͍Òå
¹ÜÀíÒ»×鹤×÷Ï̡߳£Í¨¹ýÏ̳߳ظ´ÓÃÏß³ÌÓÐÒÔϼ¸µãÓŵ㣺
¼õÉÙ×ÊÔ´´´½¨ => ¼õÉÙÄڴ濪Ïú£¬´´½¨Ïß³ÌÕ¼ÓÃÄÚ´æ
½µµÍϵͳ¿ªÏú => ´´½¨Ïß³ÌÐèҪʱ¼ä£¬»áÑÓ³Ù´¦ÀíµÄÇëÇó
Ìá¸ßÎȶ¨Îȶ¨ÐÔ => ±ÜÃâÎÞÏÞ´´½¨Ïß³ÌÒýÆðµÄOutOfMemoryError¡¾¼ò³ÆOOM¡¿
Executors´´½¨Ï̳߳صķ½Ê½
¸ù¾Ý·µ»ØµÄ¶ÔÏóÀàÐÍ´´½¨Ï̳߳ؿÉÒÔ·ÖΪÈýÀࣺ
´´½¨·µ»ØThreadPoolExecutor¶ÔÏó
´´½¨·µ»ØScheduleThreadPoolExecutor¶ÔÏó
´´½¨·µ»ØForkJoinPool¶ÔÏó
±¾ÎÄÖ»ÌÖÂÛ´´½¨·µ»Ø ThreadPoolExecutor ¶ÔÏó
ThreadPoolExecutor¶ÔÏó
ÔÚ½éÉÜ Executors ´´½¨Ï̳߳ط½·¨Ç°ÏȽéÉÜһϠThreadPoolExecutor £¬ÒòΪÕâЩ´´½¨Ï̳߳صľ²Ì¬·½·¨¶¼ÊÇ·µ»Ø ThreadPoolExecutor ¶ÔÏ󣬺ÍÎÒÃÇÊÖ¶¯´´½¨ ThreadPoolExecutor ¶ÔÏóµÄÇø±ð¾ÍÊÇÎÒÃDz»ÐèÒª×Ô¼º´«¹¹Ô캯ÊýµÄ²ÎÊý¡£ ThreadPoolExecutor µÄ¹¹Ô캯Êý¹²ÓÐËĸö£¬µ«×îÖÕµ÷ÓõͼÊÇͬһ¸ö£º
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
¹¹Ô캯Êý²ÎÊý˵Ã÷£º
corePoolSize => Ï̳߳غËÐÄÏß³ÌÊýÁ¿
maximumPoolSize => Ï̳߳Ø×î´óÊýÁ¿
keepAliveTime => ¿ÕÏÐÏ̴߳æ»îʱ¼ä
unit => ʱ¼äµ¥Î»
workQueue => Ï̳߳ØËùʹÓõĻº³å¶ÓÁÐ
threadFactory => Ï̳߳ش´½¨Ïß³ÌʹÓõŤ³§
handler => Ï̳߳ضԾܾøÈÎÎñµÄ´¦Àí²ßÂÔ
Ï̳߳ØÖ´ÐÐÈÎÎñÂß¼ºÍÏ̳߳زÎÊýµÄ¹ØÏµ
Ö´ÐÐÂ߼˵Ã÷£º
ÅжϺËÐÄÏß³ÌÊýÊÇ·ñÒÑÂú£¬ºËÐÄÏß³ÌÊý´óСºÍcorePoolSize²ÎÊýÓйأ¬Î´ÂúÔò´´½¨Ïß³ÌÖ´ÐÐÈÎÎñ
ÈôºËÐÄÏ̳߳ØÒÑÂú£¬Åж϶ÓÁÐÊÇ·ñÂú£¬¶ÓÁÐÊÇ·ñÂúºÍworkQueue²ÎÊýÓйأ¬ÈôδÂúÔò¼ÓÈë¶ÓÁÐÖÐ
Èô¶ÓÁÐÒÑÂú£¬ÅжÏÏ̳߳ØÊÇ·ñÒÑÂú£¬Ï̳߳ØÊÇ·ñÒÑÂúºÍmaximumPoolSize²ÎÊýÓйأ¬ÈôδÂú´´½¨Ïß³ÌÖ´ÐÐÈÎÎñ
ÈôÏ̳߳ØÒÑÂú£¬Ôò²ÉÓþܾø²ßÂÔ´¦ÀíÎÞ·¨Ö´Ö´ÐеÄÈÎÎñ£¬¾Ü¾ø²ßÂÔºÍhandler²ÎÊýÓйØ
Executors´´½¨·µ»ØThreadPoolExecutor¶ÔÏó
Executors ´´½¨·µ»ØThreadPoolExecutor¶ÔÏóµÄ·½·¨¹²ÓÐÈýÖÖ£º
Executors#newCachedThreadPool => ´´½¨¿É»º´æµÄÏ̳߳Ø
Executors#newSingleThreadExecutor => ´´½¨µ¥Ï̵߳ÄÏ̳߳Ø
Executors#newFixedThreadPool => ´´½¨¹Ì¶¨³¤¶ÈµÄÏ̳߳Ø
Executors#newCachedThreadPool·½·¨
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool ÊÇÒ»¸ö¸ù¾ÝÐèÒª´´½¨ÐÂÏ̵߳ÄÏ̳߳Ø
corePoolSize => 0£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª0
maximumPoolSize => Integer.MAX_VALUE£¬Ï̳߳Ø×î´óÊýÁ¿ÎªInteger.MAX_VALUE£¬¿ÉÒÔÈÏΪ¿ÉÒÔÎÞÏÞ´´½¨Ïß³Ì
keepAliveTime => 60L
unit => Ãë
workQueue => SynchronousQueue
µ±Ò»¸öÈÎÎñÌύʱ£¬ corePoolSize Ϊ0²»´´½¨ºËÐÄỊ̈߳¬ SynchronousQueue ÊÇÒ»¸ö²»´æ´¢ÔªËصĶÓÁУ¬¿ÉÒÔÀí½âΪ¶ÓÀïÓÀÔ¶ÊÇÂúµÄ£¬Òò´Ë×îÖջᴴ½¨·ÇºËÐÄÏß³ÌÀ´Ö´ÐÐÈÎÎñ¡£¶ÔÓڷǺËÐÄÏ߳̿ÕÏÐ60sʱ½«±»»ØÊÕ¡£ ÒòΪ Integer.MAX_VALUE ·Ç³£´ó£¬¿ÉÒÔÈÏΪÊÇ¿ÉÒÔÎÞÏÞ´´½¨Ï̵߳ģ¬ÔÚ×ÊÔ´ÓÐÏÞµÄÇé¿öÏÂÈÝÒ×ÒýÆðOOMÒì³£
Executors#newSingleThreadExecutor·½·¨
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor Êǵ¥Ïß³ÌÏ̳߳أ¬Ö»ÓÐÒ»¸öºËÐÄÏß³Ì
corePoolSize => 1£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª1
maximumPoolSize => 1£¬Ï̳߳Ø×î´óÊýÁ¿Îª1£¬¼´×î¶àÖ»¿ÉÒÔ´´½¨Ò»¸öỊ̈߳¬Î¨Ò»µÄÏ߳̾ÍÊǺËÐÄÏß³Ì
keepAliveTime => 0L
unit => ºÁÃë
workQueue => LinkedBlockingQueue
µ±Ò»¸öÈÎÎñÌύʱ£¬Ê×ÏȻᴴ½¨Ò»¸öºËÐÄÏß³ÌÀ´Ö´ÐÐÈÎÎñ£¬Èç¹û³¬¹ýºËÐÄÏ̵߳ÄÊýÁ¿£¬½«»á·ÅÈë¶ÓÁÐÖУ¬ ÒòΪ LinkedBlockingQueue Êdz¤¶ÈΪ Integer.MAX_VALUE µÄ¶ÓÁУ¬¿ÉÒÔÈÏΪÊÇÎÞ½ç¶ÓÁУ¬Òò´ËÍù¶ÓÁÐÖпÉÒÔ²åÈëÎÞÏÞ¶àµÄÈÎÎñ£¬ÔÚ×ÊÔ´ÓÐÏÞµÄʱºòÈÝÒ×ÒýÆð OOM Òì³£ £¬Í¬Ê±ÒòΪÎÞ½ç¶ÓÁУ¬ maximumPoolSize ºÍ keepAliveTime ²ÎÊý½«ÎÞЧ£¬Ñ¹¸ù¾Í²»»á´´½¨·ÇºËÐÄÏß³Ì
Executors#newFixedThreadPool·½·¨
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool Êǹ̶¨ºËÐÄÏ̵߳ÄÏ̳߳أ¬¹Ì¶¨ºËÐÄÏß³ÌÊýÓÉÓû§´«Èë
corePoolSize => 1£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª1 maximumPoolSize => 1£¬Ö»¿ÉÒÔ´´½¨Ò»¸ö·ÇºËÐÄÏß³Ì keepAliveTime => 0L unit => Ãë workQueue => LinkedBlockingQueue ËüºÍ SingleThreadExecutor ÀàËÆ£¬Î¨Ò»µÄÇø±ð¾ÍÊǺËÐÄÏß³ÌÊý²»Í¬£¬²¢ÇÒÓÉÓÚ Ê¹ÓõÄÊÇ LinkedBlockingQueue £¬ÔÚ×ÊÔ´ÓÐÏÞµÄʱºòÈÝÒ×ÒýÆð OOM Òì³£
×ܽ᣺
corePoolSize => nThreads£¬ºËÐÄÏ̳߳صÄÊýÁ¿Îª1
maximumPoolSize => nThreads£¬Ï̳߳Ø×î´óÊýÁ¿ÎªnThreads£¬¼´×î¶àÖ»¿ÉÒÔ´´½¨nThreads¸öÏß³Ì
keepAliveTime => 0L
unit => ºÁÃë
workQueue => LinkedBlockingQueue
ËüºÍSingleThreadExecutorÀàËÆ£¬Î¨Ò»µÄÇø±ð¾ÍÊǺËÐÄÏß³ÌÊý²»Í¬£¬²¢ÇÒÓÉÓÚʹÓõÄÊÇLinkedBlockingQueue£¬ÔÚ×ÊÔ´ÓÐÏÞµÄʱºòÈÝÒ×ÒýÆðOOMÒì³£
Õâ¾ÍÊÇΪʲô½ûֹʹÓà Executors È¥´´½¨Ï̳߳أ¬¶øÊÇÍÆ¼ö×Ô¼ºÈ¥´´½¨ ThreadPoolExecutor µÄÔÒò
OOMÒì³£²âÊÔ
ÀíÂÛÉÏ»á³öÏÖ OOM Òì³££¬±ØÐë²âÊÔÒ»²¨Ñé֤֮ǰµÄ˵·¨£º ²âÊÔÀࣺTaskTest.java
public class TaskTest {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
int i = 0;
while (true) {
es.submit(new Task(i++));
}
}
}
ʹÓà Executors ´´½¨µÄ CachedThreadPool £¬ÍùÏ̳߳ØÖÐÎÞÏÞÌí¼ÓÏß³Ì ÔÚÆô¶¯²âÊÔÀà֮ǰÏȽ« JVM ÄÚ´æµ÷ÕûСһµã£¬²»È»ºÜÈÝÒ×½«µçÄÔÅܳöÎÊÌ⡾±ðÎÊÎÒΪʲô֪µÀ£¬ÊÇÌúº©º©Ìðû´íÁË£¡£¡£¡¡¿£¬ÔÚ idea À Run -> Edit Configurations
JVM
²ÎÊý˵Ã÷£º
-Xms10M => Java HeapÄÚ´æ³õʼ»¯Öµ
-Xmx10M => Java HeapÄÚ´æ×î´óÖµ
ÔËÐнá¹û£º
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
Disconnected from the target VM, address: '127.0.0.1:60416', transport: 'socket'
´´½¨µ½3w¶à¸öÏ̵߳Äʱºò¿ªÊ¼±¨ OOM ´íÎó
ÁíÍâÁ½¸öÏ̳߳ؾͲ»×ö²âÊÔÁË£¬²âÊÔ·½·¨Ò»Ö£¬Ö»ÊÇ´´½¨µÄÏ̳߳ز»Ò»Ñù
ÈçºÎ¶¨ÒåÏ̳߳زÎÊý
CPUÃܼ¯ÐÍ => Ï̳߳صĴóÐ¡ÍÆ¼öΪ CPU ÊýÁ¿ + 1£¬ CPU ÊýÁ¿¿ÉÒÔ¸ù¾Ý Runtime.availableProcessors ·½·¨»ñÈ¡
IOÃܼ¯ÐÍ => CPU ÊýÁ¿ * CPU ÀûÓÃÂÊ * (1 + Ï̵߳ȴýʱ¼ä/Ïß³ÌCPUʱ¼ä)
»ìºÏÐÍ => ½«ÈÎÎñ·ÖΪ CPU Ãܼ¯ÐÍºÍ IO Ãܼ¯ÐÍ£¬È»ºó·Ö±ðʹÓò»Í¬µÄÏ̳߳ØÈ¥´¦Àí£¬´Ó¶øÊ¹Ã¿¸öÏ̳߳ؿÉÒÔ¸ù¾Ý¸÷×ԵŤ×÷¸ºÔØÀ´µ÷Õû
×èÈû¶ÓÁÐ => ÍÆ¼öʹÓÃÓнç¶ÓÁУ¬Óнç¶ÓÁÐÓÐÖúÓÚ±ÜÃâ×ÊÔ´ºÄ¾¡µÄÇé¿ö·¢Éú
¾Ü¾ø²ßÂÔ => ĬÈϲÉÓõÄÊÇ AbortPolicy ¾Ü¾ø²ßÂÔ£¬Ö±½ÓÔÚ³ÌÐòÖÐÅ׳ö RejectedExecutionException Òì³£¡¾ÒòΪÊÇÔËÐÐʱÒì³££¬²»Ç¿ÖÆ catch ¡¿£¬ÕâÖÖ´¦Àí·½Ê½²»¹»ÓÅÑÅ¡£´¦Àí¾Ü¾ø²ßÂÔÓÐÒÔϼ¸ÖֱȽÏÍÆ¼ö£º
ÔÚ³ÌÐòÖв¶»ñ RejectedExecutionException Òì³££¬ÔÚ²¶»ñÒì³£ÖжÔÈÎÎñ½øÐд¦Àí¡£Õë¶ÔĬÈϾܾø²ßÂÔ
ʹÓà CallerRunsPolicy ¾Ü¾ø²ßÂÔ£¬¸Ã²ßÂԻὫÈÎÎñ½»¸øµ÷ÓÃexecuteµÄÏß³ÌÖ´ÐС¾Ò»°ãΪÖ÷Ï̡߳¿£¬´ËʱÖ÷Ï߳̽«ÔÚÒ»¶Îʱ¼äÄÚ²»ÄÜÌá½»ÈκÎÈÎÎñ£¬´Ó¶øÊ¹¹¤×÷Ï̴߳¦ÀíÕýÔÚÖ´ÐеÄÈÎÎñ¡£´ËʱÌá½»µÄÏ߳̽«±»±£´æÔÚ TCP ¶ÓÁÐÖУ¬TCP¶ÓÁÐÂú½«»áÓ°Ïì¿Í»§¶Ë£¬ÕâÊÇÒ»ÖÖÆ½»ºµÄÐÔÄܽµµÍ
×Ô¶¨Òå¾Ü¾ø²ßÂÔ£¬Ö»ÐèҪʵÏÖ RejectedExecutionHandler ½Ó¿Ú¼´¿É
Èç¹ûÈÎÎñ²»ÊÇÌØ±ðÖØÒª£¬Ê¹Óà DiscardPolicy ºÍ DiscardOldestPolicy ¾Ü¾ø²ßÂÔ½«ÈÎÎñ¶ªÆúÒ²ÊÇ¿ÉÒÔµÄ
Èç¹ûʹÓÃExecutorsµÄ¾²Ì¬·½·¨´´½¨ ThreadPoolExecutor ¶ÔÏ󣬿ÉÒÔͨ¹ýʹÓà Semaphore ¶ÔÈÎÎñµÄÖ´ÐнøÐÐÏÞÁ÷Ò²¿ÉÒÔ±ÜÃâ³öÏÖ OOM Òì³£
ÓÉÓÚÏ̳߳زÎÊý¶¨Òå¾Ñé½ÏÉÙ£¬¶¼ÊÇÀíÂÛ֪ʶ£¬»¶ÓÓоÑéµÄ´óÀв¹³ä
µ½´ËÕâÆª¹ØÓÚdz̸Ϊʲô°¢Àï°Í°ÍÒª½ûÓÃExecutors´´½¨Ï̳߳صÄÎÄÕ¾ͽéÉܵ½ÕâÁË,¸ü¶àÏà¹Ø°¢Àï°Í°Í½ûÓÃExecutors´´½¨Ï̳߳ØÄÚÈÝÇëËÑË÷ÉçÇøÒÔǰµÄÎÄÕ»ò¼ÌÐøä¯ÀÀÏÂÃæµÄÏà¹ØÎÄÕÂÏ£Íû´ó¼ÒÒÔºó¶à¶àÖ§³ÖÉçÇø£¡