序
为什么要用线程池?什么情况下才会用到线程池?
并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
因此,就用到了线程池;线程池中的线程可以复用,就是执行完一个任务,并不被销毁,而是继续执行下一个任务。
如下使用线程:
public class Test{ public static void main(String[] args) { long start = System.currentTimeMillis(); for(int i=0;i<10000;i++){ new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }).start(); } long end = System.currentTimeMillis(); System.out.println("运行时间:"+(end-start)+"毫秒"); } }
新建10000个线程来执行任务,测试结果得出需要时间是0.7秒左右。
使用线程池:
public class ThreadPoolTest{ private static int produceTaskSleepTime = 2; private static int produceTaskMaxNumber = 20; public static void main(String[] args) { int corePoolSize = 2; int maximumPoolSize = 4; long keepAliveTime = 3; int blockingQueueSize = 3; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(blockingQueueSize), new ThreadPoolExecutor.CallerRunsPolicy()); System.out.println(threadPool.getPoolSize()); long start = System.currentTimeMillis(); for (int i = 1; i <= produceTaskMaxNumber; i++){ try{ String task = "任务 #" + i; System.out.println("放入 " + task); threadPool.execute(new ThreadPoolTask(task)); } catch (Exception e){ e.printStackTrace(); } } System.out.println("关闭线程池"); long end = System.currentTimeMillis(); System.out.println("运行时间:"+(end-start)+"毫秒"); } }
public class ThreadPoolTask implements Runnable, Serializable { private static final long serialVersionUID = 0; private static int consumeTaskSleepTime = 2000; // 保存任务所需要的数据 private Object threadPoolTaskData; ThreadPoolTask(Object tasks) { this.threadPoolTaskData = tasks; } public void run() { System.err.println("线程 " + Thread.currentThread().getName() + " 开始做 " + threadPoolTaskData); threadPoolTaskData = null; } public Object getTask() { return this.threadPoolTaskData; } }
同样的使用线程池,只需要0.1秒左右的时间。很容易就可以发现这两者中间的差距。
正文
上面的线程池的例子中用到的ThreadPoolExecutor是JDK并发包提供的一个线程池服务,基于ThreadPoolExecutor可以很容易将一个Runnable接口的任务放入线程池中。
ThreadPoolExecutor提供了四个构造器,看源码可以发现前三个构造器都是调用的第四个构造器进行的初始化工作:
public class ThreadPoolExecutor extends AbstractExecutorService { ... public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ... }
构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime(线程池维护线程所允许的空闲时间的单位),可选参数值有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值(默认ThreadPoolExecutor.AbortPolicy):
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,线程调用运行该任务的execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。而ExecutorService又是继承了Executor接口。(以上具体代码可以自行翻阅JDK源码,因为太多就不一一粘贴)
Executor是一个顶层类,里面只声明了一个execute()方法,返回值为void,参数为Runable类型,作用就是执行传进去的任务的。
在ThreadPoolExecutor类中有几个非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
shutdown()和shutdownNow()是用来关闭线程池的;调用shutdown()方法,线程池处于SHUTDOWN状态,此时线程池不能接受新的任务,它会等待所有任务执行完毕;调用shutdownNow()方法,线程池处于STOP状态,此时线程池不能接受新的任务,并且回去尝试终止正在执行的任务,返回尚未执行的任务。
线程池的状态:
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
RUNNING :
当创建线程池后,初始时,线程池处于RUNNING状态
RUNNING -> SHUTDOWN :
调用shutdown()方法,线程池处于SHUTDOWN状态
RUNNING or SHUTDOWN -> STOP :
调用shutdownNow()方法,线程池处于STOP状态
SHUTDOWN or STOP -> TERMINATED :
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态
需要注意的是默认情况下,创建线程池之后线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法:
prestartCoreThread():初始化一个核心线程
prestartAllCoreThreads():初始化所有核心线程
线程池中比较重要的成员变量:
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务 private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁 private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集 private volatile long keepAliveTime; //线程存活时间 private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间 private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; //线程池最大能容忍的线程数(防止任务量突然过大) private volatile int poolSize; //线程池中当前的线程数 private volatile RejectedExecutionHandler handler; //任务拒绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程 private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数,跟线程池的容量没有关系 private long completedTaskCount; //用来记录已经执行完毕的任务个数
任务的执行过程:
线程池最基本的差不多说完了,下面再说下任务的执行过程。
在ThreadPoolExecutor中,最核心的任务提交方法就是execute()方法,虽然上面提到的submit()方法也可以提交任务,但是实际上submit()方法调用的还是execute()方法。
我们看下execute()方法的源码:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); } }
这段代码首先判断提交的任务command是否为null,如果是null,则抛空指针异常;
接着往下,这句代码比较有意思,
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
由于使用的是或条件运算符,所以会先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块;如果线程池中当前线程数小于核心池大小,则接着执行后半部分:
addIfUnderCorePoolSize(command)
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
addIfUnderCorePoolSize()方法首先调用mainLock加锁,再次判断当前线程数小于corePoolSize并且线程池处于RUNNING状态,则调用addThread()增加线程:
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
addThread()方法首先创建Work对象,然后调用threadFactory创建新的线程,如果创建的线程不为null,将Work对象的thread属性设置为此创建出来的线程,并将此Work对象放入workers中,然后在增加当前线程池的中线程数,增加后回到addIfUnderCorePoolSize方法 ,释放mainLock,最后启动这个新创建的线程来执行新传入的任务。
回到execute()方法,如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕;
如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:
if (runState == RUNNING && workQueue.offer(command))
如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列,如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:
addIfUnderMaximumPoolSize(command)
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
如果执行addIfUnderMaximumPoolSize()方法返回false,则执行reject()方法进行任务拒绝处理。addIfUnderMaximumPoolSize()方法和addIfUnderCorePoolSize()方法类似,不同的是addIfUnderMaximumPoolSize()方法是判断的maximumPoolSize进行比较,如果超过最大线程数则返回false,而addIfUnderCorePoolSize()方法判断的是corePoolSize。留心的同学可能会发现这两个方法都有判断当前线程数目的大小是否小于corePoolSize(或者maximumPoolSize),原因很简单,前面的判断过程中并没有加锁,因此可能在execute()方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown()或者shutdownNow()方法。
继续说if (runState == RUNNING && workQueue.offer(command))这句,如果当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:
if (runState != RUNNING || poolSize == 0)
这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:
ensureQueuedTaskHandled(command);
ensureQueuedTaskHandled()方法是进行应急处理,从名字可以看出是保证添加到任务缓存队列中的任务得到处理。
private void ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean reject = false; Thread t = null; try { int state = runState; if (state != RUNNING && workQueue.remove(command)) reject = true; else if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) t = addThread(null); } finally { mainLock.unlock(); } if (reject) reject(command); else if (t != null) t.start(); }
ensureQueuedTaskHandled()方法判断线程池运行,如果状态不为运行状态,从workQueue中删除, 并调用reject做拒绝处理。
任务缓存队列及排队策略:
任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
线程池容量调整:
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:
setCorePoolSize()//设置核心池大小
setMaximumPoolSize()//设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
注:
以上Demo及源码基于JDK1.6版本。