线程池概念
说的简单明了一点,就是管理线程的一个池子,是一种基于池化思想管理线程的工具。
为解决资源分配的问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia
为什么需要线程池
Java 线程使用的是一对一线程模型来实现的,线程每次创建和销毁都会有一定的性能消耗,线程池的作用就是把创建好的线程统一管理起来,能避免重复创建和销毁带来的开销,从而达到减低资源损耗,提升程序响应速度的好处。
线程池体系结构图
Java 中的线程池核心实现类是ThreadPoolExecutor
,顶层接口是Executor
,顶层接口Executor
提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象执行命令,将任务的运行逻辑提交到执行器(Executor
)中,由Executor
框架完成线程的调配和任务的执行部分。
ExecutorService
接口增加了一些能力:
(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
(2)提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService
则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
最下层的实现类ThreadPoolExecutor
实现最复杂的运行部分,ThreadPoolExecutor
将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
ThreadPoolExecutor 线程池
线程池的运行状态
从ThreadPoolExecutor
的源码中可以看出,ThreadPoolExecutor线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
int COUNT_BITS = Integer.SIZE - 3; // 29 https://jingling.im
int CAPACITY = (1 << COUNT_BITS) - 1; // 二进制表示是:29个1 (1 1111 1111 1111 1111 1111 1111 1111)
它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。
运行状态和线程数量计算方法如以下:
private static int runStateOf(int c) { return c & ~CAPACITY; } // 返回运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 返回线程数量,workerCount是已被允许启动且未被允许停止的数量
private static int ctlOf(int rs, int wc) { return rs | wc; } // 通过运行状态和线程数量生成ctl
线程池一共有5个运行状态,源码如下:
// 用高3位来表示运行状态
// -1的二进制位全是1,然后左移29位,最后结果是0-29位是0,30位以上都是1,二进制:1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
// 0 左移29位还是0 https://jingling.im
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 1左移29位,结果就是第30位是1,二进制:0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// 2的二进制是10,左移29位,结果就是第31位是1,二进制:0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 3的二进制是11,左移29位,结果就是第30和31位是1,二进制:0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
他们的含义分别是:
- RUNNING 表示可以接收新的任务,并且可以处理阻塞队列中的任务
- SHUTDOWN 关闭状态,不能接收新提交的任务,但可以继续处理阻塞队列中的已存在的任务
- STOP 不再接收新的任务,也不处理队列中的任务,并且会中断正在处理任务的线程
- TIDYING 所有任务已经中止,且工作线程数量(workCount)为0。最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法
- TERMINATED 中止状态,已经执行完terminated()钩子方法;by: https://jingling.im
线程池的生命周期运转图:
线程池的核心参数
corePoolSize
- 核心线程数;运行中的线程数小于corePoolSize时, 就会创建一个核心线程, 否则丢队列中
maximumPoolSize
- 最大线程数;队列满了之后, 创建线程发现运行中的线程数等于corePoolSize时, 会创建一个非核心线程, 加上corePoolSize后不能大于maximumPoolSize
keepAliveTime
- 线程保持空闲时间;即当任务队列为空时,线程保持多久才会销毁,内部主要是通过阻塞队列带超时的poll(timeout, unit)方法实现的。
- 默认情况下,此两参数仅当正在运行的线程数大于核心线程数时才有效,即只针对非核心线程。
- 但是当
allowCoreThreadTimeOut
被设置成成true时,针对核心线程也有效。
timeUnit
- 线程保持空闲时间单位
workQueue
- 任务队列, 且必须是实现了BlockingQueue的阻塞队列
threadFactory
- 线程工厂
- 默认使用的是Executors工具类中的Executors.defaultThreadFactory()类,这个类有个缺点,创建的线程的名称是自动生成的,无法自定义以区分不同的线程池,且它们都是非守护线程。
- 那怎么自定义一个线程工厂呢?其实也很简单,自己实现一个ThreadFactory,然后把名称和是否是守护进程当作构造方法的参数传进来就可以了。
handler
- 拒绝策略;常用的策略有:
- AbortPolicy:抛出RejectedExecutionException异常(默认的策略)
- DiscardPolicy:Does nothing, 什么也不做
- DiscardOldestPolicy:从队列中poll一个任务, 然后再执行(要求线程池未关闭)
- CallerRunsPolicy:直接执行(要求线程池未关闭)
线程池运行机制
线程池的任务调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 首先检测线程池运行状态,如果不是
RUNNING
,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。 - 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
execute执行流程图:
线程池的阻塞队列
阻塞队列是线程池的核心。线程池中是以生产者消费者模式,然后通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空。
- 当队列为满时,存储元素的线程会等待队列变为非满可用。
不同的阻塞队列可以实现不一样的任务存取策略。
常见的阻塞队列有:
- ArrayBlockingQueue 基于数组实现的有界阻塞队列;按照FIFO的原则对元素进行排序;支持公平锁和非公平锁;
- LinkedBlockingQueue 基于链表实现的有界阻塞队列;按照FIFO的原则对元素进行排序;默认长度是Integer.MAX_VALUE,使用需谨慎;
- LinkedBlockingDueue 基于链表实现的双向阻塞队列;队头和队尾都可以添加和移除元素;默认长度是Integer.MAX_VALUE,使用需谨慎;
- LinkedTransferQueue 基于链表实现的无界阻塞队列;与其他队列相比多了transfer方法。
- PriorityBlockingQueue 基于数组实现的无界阻塞队列;支持线程优先级排序,默认自然排序并支持自定义实现compareTo()方法指定排序规则;不能保证同优先级元素的顺序;
- DelayQueue 基于PriorityQueue实现的无界阻塞队列;可以实现延迟获取,指定时间后才可以从队列中获取到元素;
- SynchronousQueue 一个不存储元素的阻塞队列;每一次put操作必须等待take操作,否则不能再put元素;支持公平锁和非公平锁。isEmpty()始终返回true;
Executors 线程池
Executors 是JDK1.5开始提供的一个线程池工具类,支持直接调用方法创建6种不同的线程池,他们分别是:
- newCachedThreadPool()
缓存线程以便重复使用,如果限制 60 秒没被使用,则会被移除缓存;用的是
SynchronousQueue
队列;队列长度为 Integer.MAX_VALUE; - newFixedThreadPool(int nThreads)
创建一个数量固定的线程池;用的是
LinkedBlockingQueue
队列;队列长度为 Integer.MAX_VALUE; - newSingleThreadExecutor()
创建一个单线程线程池。用的是
LinkedBlockingQueue
队列;队列长度为 Integer.MAX_VALUE; - newScheduledThreadPool(int corePoolSize)
创建一个数量固定的线程池,支持执行定时性或周期性任务;用的是
DelayedWorkQueue
队列; - newSingleThreadScheduledExecutor() 单线程的 newScheduledThreadPool;
- newWorkStealingPool()
Java 8 新增创建线程池的方法,此线程池会并行处理任务,不能保证执行顺序。用的是
ForkJoinPool
;
不建议通过Executors 线程池去创建线程池,尤其是FixedThreadPool、SingleThreadPool和CachedThreadPool,因为他们允许的队列长度都是Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。