首页>>后端>>java->深入学习Java线程池

深入学习Java线程池

时间:2023-12-01 本站 点击:0

在之前,我们都是通过new Thread来创建一个线程,由于线程的创建和销毁都需要消耗一定的CPU资源,所以在高并发下这种创建线程的方式将严重影响代码执行效率。而线程池的作用就是让一个线程执行结束后不马上销毁,继续执行新的任务,这样就节省了不断创建线程和销毁线程的开销。

ThreadPoolExecutor

创建Java线程池最为核心的类为ThreadPoolExecutor

它提供了四种构造函数来创建线程池,其中最为核心的构造函数如下所示:

publicThreadPoolExecutor(intcorePoolSize,//核心线程数intmaximumPoolSize,//最大线程个数longkeepAliveTime,//等待时间TimeUnitunit,//等待时间单位BlockingQueue<Runnable>workQueue,//工作队列ThreadFactorythreadFactory,//线程创建工厂RejectedExecutionHandlerhandler)//拒绝策略

这7个参数的含义如下:

corePoolSize 线程池核心线程数。即线程池中保留的线程个数,即使这些线程是空闲的,也不会被销毁,除非通过ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法开启了核心线程的超时策略;

maximumPoolSize 线程池中允许的最大线程个数;

keepAliveTime 用于设置那些超出核心线程数量的线程的最大等待时间,超过这个时间还没有新任务的话,超出的线程将被销毁;

unit 超时时间单位;

workQueue 线程队列。用于保存通过execute方法提交的,等待被执行的任务;

threadFactory 线程创建工厂,即指定怎样创建线程;

handler 拒绝策略。即指定当线程提交的数量超出了maximumPoolSize后,该使用什么策略处理超出的线程。

在通过这个构造方法创建线程池的时候,这几个参数必须满足以下条件,否则将抛出IllegalArgumentException异常:

corePoolSize不能小于0;

keepAliveTime不能小于0;

maximumPoolSize 不能小于等于0;

maximumPoolSize不能小于corePoolSize;

此外,workQueue、threadFactory和handler不能为null,否则将抛出空指针异常。

下面举些例子来深入理解这几个参数的含义。

使用上面的构造方法创建一个线程池:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}

上面的代码创建了一个核心线程数量为1,允许最大线程数量为2,最大活跃时间为10秒,线程队列长度为1的线程池。

假如我们通过execute方法向线程池提交1个任务,看看结果如何:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}

ThreadPoolExecutor的execute和submit方法都可以向线程池提交任务,区别是,submit方法能够返回执行结果,返回值类型为Future。

sleep方法代码:

privatestaticvoidsleep(longvalue){try{System.out.println(Thread.currentThread().getName()+"线程执行sleep方法");TimeUnit.SECONDS.sleep(value);}catch(InterruptedExceptione){e.printStackTrace();}}

线程池核心线程数量为1,通过execute提交了一个任务后,由于核心线程是空闲的,所以任务被执行了。由于这个任务的逻辑是休眠100秒,所以在这100秒内,线程池的活跃线程数量为1。此外,因为提交的任务被核心线程执行了,所以并没有线程需要被放到线程队列里等待,线程队列长度为0。

假如我们通过execute方法向线程池提交2个任务,看看结果如何:

threadPoolExecutor.execute(()->sleep(100));threadPoolExecutor.execute(()->sleep(100));

线程池核心线程数量为1,通过execute提交了2个任务后,一开始核心线程是空闲的,Thread-0被执行。由于这个任务的逻辑是休眠100秒,所以在这100秒内,线程池的活跃线程数量为1。因为核心线程数量为1,所以另外一个任务在这100秒内不能被执行,于是被放到线程队列里等待,线程队列长度为1。

假如我们通过execute方法向线程池提交3个任务,看看结果如何:

threadPoolExecutor.execute(()->sleep(100));threadPoolExecutor.execute(()->sleep(100));threadPoolExecutor.execute(()->sleep(100));

这三个任务都是休眠100秒,所以核心线程池中第一个任务正在被执行,第二个任务被放入到了线程队列。而当第三个任务被提交进来时,线程队列满了(我们定义的长度为1),由于该线程池允许的最大线程数量为2,所以线程池还可以再创建一个线程来执行另外一个任务,于是乎之前在线程队列里的线程被取出执行(FIFO),第三个任务被放入到了线程队列。

改变第二个和第三个任务的睡眠时间,观察输出:

threadPoolExecutor.execute(()->sleep(100));threadPoolExecutor.execute(()->sleep(5));threadPoolExecutor.execute(()->sleep(5));

第二个任务提交5秒后,任务执行完毕,所以线程队列里的任务被执行,于是队列线程个数为0,活跃线程数量为2(第一个和第三个任务)。再过5秒后,第三个任务执行完毕,于是活跃线程数量为1(第一个100秒还没执行完毕)。

在第三个任务结束的瞬间,我们观察线程快照:

可以看到,线程池中有两个线程,Thread-0在执行第一个任务(休眠100秒,还没结束),Thread-1执行完第三个任务后并没有马上被销毁。过段时间后(10秒钟后)再观察线程快照:

可以看到,Thread-1这个线程被销毁了,因为我们在创建线程池的时候,指定keepAliveTime 为10秒,10秒后,超出核心线程池线程外的那些线程将被销毁。

假如一次性提交4个任务,看看会怎样:

threadPoolExecutor.execute(()->sleep(100));threadPoolExecutor.execute(()->sleep(100));threadPoolExecutor.execute(()->sleep(100));threadPoolExecutor.execute(()->sleep(100));

因为我们设置的拒绝策略为AbortPolicy,所以最后提交的那个任务直接被拒绝了。更多拒绝策略下面会介绍到。

关闭线程池

线程池包含以下几个状态:

当线程池中所有任务都处理完毕后,线程并不会自己关闭。我们可以通过调用shutdownshutdownNow方法来关闭线程池。两者的区别在于:

shutdown方法将线程池置为shutdown状态,拒绝新的任务提交,但线程池并不会马上关闭,而是等待所有正在执行的和线程队列里的任务都执行完毕后,线程池才会被关闭。所以这个方法是平滑的关闭线程池。

shutdownNow方法将线程池置为stop状态,拒绝新的任务提交,中断正在执行的那些任务,并且清除线程队列里的任务并返回。所以这个方法是比较“暴力”的。

举两个例子观察下两者的区别:

shutdown例子:

publicstaticvoidmain(String[]args){ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(2),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());threadPoolExecutor.execute(newshortTask());threadPoolExecutor.execute(newlongTask());threadPoolExecutor.execute(newlongTask());threadPoolExecutor.execute(newshortTask());threadPoolExecutor.shutdown();System.out.println("已经执行了线程池shutdown方法");}staticclassshortTaskimplementsRunnable{@Overridepublicvoidrun(){try{TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName()+"执行shortTask完毕");}catch(InterruptedExceptione){System.err.println("shortTask执行过程中被打断"+e.getMessage());}}}staticclasslongTaskimplementsRunnable{@Overridepublicvoidrun(){try{TimeUnit.SECONDS.sleep(5);System.out.println(Thread.currentThread().getName()+"执行longTask完毕");}catch(InterruptedExceptione){System.err.println("longTask执行过程中被打断"+e.getMessage());}}}

启动程序,控制台输出如下:

可以看到,虽然在任务都被提交后马上执行了shutdown方法,但是并不会马上关闭线程池,而是等待所有被提交的任务都执行完了才关闭。

shutdownNow例子:

publicstaticvoidmain(String[]args){ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(2,4,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(2),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());threadPoolExecutor.execute(newshortTask());threadPoolExecutor.execute(newlongTask());threadPoolExecutor.execute(newlongTask());threadPoolExecutor.execute(newshortTask());List<Runnable>runnables=threadPoolExecutor.shutdownNow();//马上关闭,并返回还未被执行的任务System.out.println(runnables);System.out.println("已经执行了线程池shutdownNow方法");}staticclassshortTaskimplementsRunnable{@Overridepublicvoidrun(){try{TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName()+"执行shortTask完毕");}catch(InterruptedExceptione){System.err.println("shortTask执行过程中被打断"+e.getMessage());}}}staticclasslongTaskimplementsRunnable{@Overridepublicvoidrun(){try{TimeUnit.SECONDS.sleep(5);System.out.println(Thread.currentThread().getName()+"执行longTask完毕");}catch(InterruptedExceptione){System.err.println("longTask执行过程中被打断"+e.getMessage());}}}

启动程序,控制台输出如下:

可以看到,在执行shutdownNow方法后,线程池马上就被关闭了,正在执行中的两个任务被打断,并且返回了线程队列中等待被执行的两个任务。

通过上面两个例子我们还可以看到shutdownshutdownNow方法都不是阻塞的。常与shutdown搭配的方法有awaitTermination

awaitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。该方法是阻塞的:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}0

启动程序输出如下:

4大拒绝策略

当线程池无法再接收新的任务的时候,可采取如下四种策略:

CallerRunsPolicy

CallerRunsPolicy策略:由调用线程处理该任务:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}1

上面的线程池最多只能一次性提交4个任务,第5个任务提交后会被拒绝策略处理。启动程序输出如下:

可以看到,第5个提交的任务由调用线程(即main线程)处理该任务。

AbortPolicy

AbortPolicy策略:丢弃任务,并抛出RejectedExecutionException异常。前面的例子就是使用该策略,所以不再演示。

DiscardOldestPolicy

DiscardOldestPolicy策略:丢弃最早被放入到线程队列的任务,将新提交的任务放入到线程队列末端:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}2

启动程序输出如下:

可以看到最后提交的任务被执行了,而第3个任务是第一个被放到线程队列的任务,被丢弃了。

DiscardPolicy

DiscardPolicy策略:直接丢弃新的任务,不抛异常:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}3

启动程序,输出如下:

第5个任务直接被拒绝丢弃了,而没有抛出任何异常。

线程池工厂方法

除了使用ThreadPoolExecutor的构造方法创建线程池外,我们也可以使用Executors提供的工厂方法来创建不同类型的线程池:

newFixedThreadPool

查看newFixedThreadPool方法源码:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}4

可以看到,通过newFixedThreadPool创建的是一个固定大小的线程池,大小由nThreads参数指定,它具有如下几个特点:

因为corePoolSize和maximumPoolSize的值都为nThreads,所以线程池中线程数量永远等于nThreads,不可能新建除了核心线程数的线程来处理任务,即keepAliveTime实际上在这里是无效的。

LinkedBlockingQueue是一个无界队列(最大长度为Integer.MAX_VALUE),所以这个线程池理论是可以无限的接收新的任务,这就是为什么上面没有指定拒绝策略的原因。

newCachedThreadPool

查看newCachedThreadPool方法源码:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}5

这是一个理论上无限大小的线程池:

核心线程数为0,SynchronousQueue队列是没有长度的队列,所以当有新的任务提交,如果有空闲的还未超时的(最大空闲时间60秒)线程则执行该任务,否则新增一个线程来处理该任务。

因为线程数量没有限制,理论上可以接收无限个新任务,所以这里也没有指定拒绝策略。

newSingleThreadExecutor

查看newSingleThreadExecutor源码:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}6

核心线程数和最大线程数都为1,每次只能有一个线程处理任务。

LinkedBlockingQueue队列可以接收无限个新任务。

newScheduledThreadPool

查看newScheduledThreadPool源码:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}7

所以newScheduledThreadPool理论是也是可以接收无限个任务,DelayedWorkQueue也是一个无界队列。

使用newScheduledThreadPool创建的线程池除了可以处理普通的Runnable任务外,它还具有调度的功能:

1.延迟指定时间后执行:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}8

2.按指定的速率执行:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}9

3.按指定的时延执行:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}0

乍一看,scheduleAtFixedRate和scheduleWithFixedDelay没啥区别,实际它们还是有区别的:

scheduleAtFixedRate按照固定速率执行任务,比如每5秒执行一个任务,即使上一个任务没有结束,5秒后也会开始处理新的任务;

scheduleWithFixedDelay按照固定的时延处理任务,比如每延迟5秒执行一个任务,无论上一个任务处理了1秒,1分钟还是1小时,下一个任务总是在上一个任务执行完毕后5秒钟后开始执行。

对于这些线程池工厂方法的使用,阿里巴巴编程规程指出:

因为这几个线程池理论是都可以接收无限个任务,所以这就有内存溢出的风险。实际上只要我们掌握了ThreadPoolExecutor构造函数7个参数的含义,我们就可以根据不同的业务来创建出符合需求的线程池。一般线程池的创建可以参考如下规则:

IO密集型任务:IO密集型任务线程并不是一直在执行任务,应该配置尽可能多的线程,线程池线程数量推荐设置为2 * CPU核心数;对于IO密集型任务,网络上也有另一种线程池数量计算公式:CPU核心数/(1 - 阻塞系数),阻塞系数取值0.8~0.9,至于这两种公式使用哪一个,可以根据实际环境测试比较得出;

计算密集型任务:此类型需要CPU的大量运算,所以尽可能的去压榨CPU资源,线程池线程数量推荐设置为CPU核心数 + 1。

CPU核心数可以使用Runtime获得:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}1

一些API的用法

ThreadPoolExecutor提供了几个判断线程池状态的方法:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}2

程序输出如下:

前面我们提到,线程池核心线程即使是空闲状态也不会被销毁,除非使用allowCoreThreadTimeOut设置了允许核心线程超时:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}3

程序输出如下所示:

5秒后任务执行完毕,核心线程处于空闲的状态。因为通过allowCoreThreadTimeOut方法设置了允许核心线程超时,所以3秒后(keepAliveTime设置为3秒),核心线程被销毁。核心线程被销毁后,线程池也就没有作用了,于是就自动关闭了。

值得注意的是,如果一个线程池调用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能为0。

ThreadPoolExecutor提供了一remove方法,查看其源码:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}4

可看到,它删除的是线程队列中的任务,而非正在被执行的任务。举个例子:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}5

执行程序,输出如下:

可看到任务并没有被执行,已经被删除,因为唯一一个核心线程已经在执行任务了,所以后提交的这个任务被放到了线程队列里,然后通过remove方法删除。

默认情况下,只有当往线程池里提交了任务后,线程池才会启动核心线程处理任务。我们可以通过调用prestartCoreThread方法,让核心线程即使没有任务提交,也处于等待执行任务的活跃状态:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}6

程序输出如下所示:

该方法返回boolean类型值,如果所以核心线程都启动了,返回false,反之返回true。

还有一个和它类似的prestartAllCoreThreads方法,它的作用是一次性启动所有核心线程,让其处于活跃地等待执行任务的状态。

ThreadPoolExecutor的invokeAny方法用于随机执行任务集合中的某个任务,并返回执行结果,该方法是同步方法:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}7

启动程序,输出如下:

ThreadPoolExecutor的invokeAll则是执行任务集合中的所有任务,返回Future集合:

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(1,2,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(1),(ThreadFactory)Thread::new,newThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");threadPoolExecutor.execute(()->sleep(100));intactiveCount=-1;intqueueSize=-1;while(true){if(activeCount!=threadPoolExecutor.getActiveCount()||queueSize!=threadPoolExecutor.getQueue().size()){System.out.println("活跃线程个数"+threadPoolExecutor.getActiveCount());System.out.println("核心线程个数"+threadPoolExecutor.getCorePoolSize());System.out.println("队列线程个数"+threadPoolExecutor.getQueue().size());System.out.println("最大线程数"+threadPoolExecutor.getMaximumPoolSize());System.out.println("------------------------------------");activeCount=threadPoolExecutor.getActiveCount();queueSize=threadPoolExecutor.getQueue().size();}}8

输出如下:

总结下这些方法:

方法描述allowCoreThreadTimeOut(boolean value)是否允许核心线程空闲后超时,是的话超时后核心线程将销毁,线程池自动关闭awaitTermination(long timeout, TimeUnit unit)阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。execute(Runnable command)向线程池提交任务,没有返回值submit(Runnable task)向线程池提交任务,返回FutureisShutdown()判断线程池是否为shutdown状态isTerminating()判断线程池是否正在关闭isTerminated()判断线程池是否已经关闭remove(Runnable task)移除线程队列中的指定任务prestartCoreThread()提前让一个核心线程处于活跃状态,等待执行任务prestartAllCoreThreads()提前让所有核心线程处于活跃状态,等待执行任务getActiveCount()获取线程池活跃线程数getCorePoolSize()获取线程池核心线程数threadPoolExecutor.getQueue()获取线程池线程队列getMaximumPoolSize()获取线程池最大线程数shutdown()让线程池处于shutdown状态,不再接收任务,等待所有正在运行中的任务结束后,关闭线程池。shutdownNow()让线程池处于stop状态,不再接受任务,尝试打断正在运行中的任务,并关闭线程池,返回线程队列中的任务。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/5463.html