需求分析
(1)自己动手写一个线程池需要考虑哪些因素?
(2)自己动手写的线程池如何测试?
思考:
- 既然是线程池, 池, 就是需要一个放线程的地方
- 可以控制池的大小, 并不是无限的, 也就是coreSize
- coreSzie满了, 任务需要排队, 所以需要一个有限的队列, 且这个队列必须要是线程安全的, 比如阻塞队列BlockingQueue
- 如果coreSize的线程执行的很快, 那队列里面排队的线程就可以很快被执行完成, 如果队列满了, 可以再增加线程来执行队列里的任务, 也就是maxSize
- 最后当队列满了, maxSize也已经达到了, 这时候就需要一种拒接策略了, 常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。
根据上面的思考,我们定义一个线程池一共需要这么四个变量:
核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。
定义线程池
定义核心参数以及构造方法
public class MyThreadPoolExecutor implements Executor{
/**
* 线程池的核心大小
*/
private volatile int coreSize;
/**
* 线程池的最大值
*/
private volatile int maxSize;
/**
* 排队的线程队列
*/
private final BlockingQueue<Runnable> taskQueue;
/**
* 拒绝策略
*/
private volatile MyRejectedHandler handler;
/**
* 线程池线程名称前缀
*/
private String poolNamePrefix ;
private static final String defaultPoolNamePrefix = "default-my-thread-pool-";
/**
* 运行中的线程数
*/
private final AtomicInteger runningCount = new AtomicInteger(0);
public MyThreadPoolExecutor(int coreSize,int maxSize,BlockingQueue<Runnable> taskQueue){
this(coreSize,maxSize,taskQueue,defaultPoolNamePrefix);
}
/**
* 构造方法
* @param coreSize
* @param maxSize
* @param taskQueue
* @param poolNamePrefix
*/
public MyThreadPoolExecutor(int coreSize,int maxSize,BlockingQueue<Runnable> taskQueue,String poolNamePrefix){
this.coreSize = coreSize;
this.maxSize = maxSize;
this.taskQueue = taskQueue;
this.poolNamePrefix = poolNamePrefix;
this.handler = new MyRejectedHandler();
}
}
实现execute方法
@Override
public void execute(Runnable command){
if(command == null){
throw new NullPointerException();
}
// 1.判断线程数是否达到了核心大小
if(runningCount.get() < coreSize){
// 1.1 没有达到coreSize, 按照核心线程执行
if(addWorker(command,true)){
return;
}
}
// 2.加入到队列, 加入失败会返回false, 如果用add, 队列满会抛异常
if(taskQueue.offer(command)){
}else{
// 3.加入队列失败, 以非核心线程的方式执行线程任务
if(!addWorker(command,false)){
// 3.1 以非核心线程的方式执行失败, 执行拒绝策略
reject(command);
}
}
}
执行基本流程:
- 判断线程数是否达到了核心大小
- 没有达到coreSize, 按照核心线程执行
- 任务数达到coreSize后进来的任务加入到队列
- 如果队列满了, 按照maxSize线程数进行执行任务
- 如果任务数达到maxSize,后进来的任务按照拒绝策略来执行
添加到工作线程
private boolean addWorker(Runnable command,boolean coreThread){
// 自旋
for(;;){
// 运行中的线程数
int running = runningCount.get();
// 确实是取核心大小还是最大值
int wc = coreThread ? coreSize : maxSize;
if(running >= wc){
return false;
}
// 工作线程+1, 失败则自旋
if(runningCount.compareAndSet(running,running+1)){
break;
}
}
// 标识是否开始工作
boolean workerStarted = false;
String coreThreadStr = coreThread ? "core-":"max-";
String threadName = defaultPoolNamePrefix + coreThreadStr + runningCount.get();
final Thread t = new Thread(new Runnable(){
@Override
public void run(){
// 执行任务
runWorker(command);
// 执行完任务, runningCount-1
runningCount.decrementAndGet();
}
},threadName);
if (t != null) {
t.start();
workerStarted = true;
}
return workerStarted;
}
执行任务过程
private void runWorker(Runnable task){
Runnable t = task;
while(t != null || (t = getTask()) != null){
try{
log.info(Thread.currentThread().getName()+" is running");
t.run();
}catch(Exception e){
throw e;
}finally{
t = null;
}
}
}
自定义拒绝策略
class MyRejectedHandler {
public void rejectedExecution(Runnable r,MyThreadPoolExecutor executor){
log.info(r.toString()+": 执行了拒绝策略");
// 可以自定拒绝策略, 执行, 忽略, 从队列取任务, 抛异常都可以
//r.run();
}
}
从队列取任务
private Runnable getTask(){
try{
return taskQueue.take();
}catch(InterruptedException e){
return null;
}
}
测试线程池
public static void main(String[] args) throws InterruptedException{
MyThreadPoolExecutor myPool = new MyThreadPoolExecutor(2,3,new ArrayBlockingQueue(6));
AtomicInteger taskIndex = new AtomicInteger(1);
while(true){
myPool.execute(new Runnable(){
@SneakyThrows
@Override
public void run(){
System.out.println("执行任务:"+taskIndex.getAndIncrement());
// coreSize 为2时, 2秒足以, 不会出现maxSize的线程
TimeUnit.MILLISECONDS.sleep(800);
// coreSize 线程处理不过来, 会启用max线程
// TimeUnit.MILLISECONDS.sleep(3000);
// max线程处理不过来, 会启用队列, 队列满了, 会执行拒绝策略
// TimeUnit.MILLISECONDS.sleep(4000);
}
});
TimeUnit.MILLISECONDS.sleep(300);
// 每隔1s提交一个任务
// TimeUnit.MILLISECONDS.sleep(1000);
}
}
测试输出:
default-my-thread-pool-core-1 is running 执行任务:1
default-my-thread-pool-core-2 is running 执行任务:2
default-my-thread-pool-core-1 is running 执行任务:3
default-my-thread-pool-core-2 is running 执行任务:4
default-my-thread-pool-max-3 is running 执行任务:5
17:29:47.401 [main] INFO net.admol.jingling.demo.executor.MyThreadPoolExecutor - executor.TestMyThreadPool$1@46d56d67: 执行了拒绝策略