线程创建从来不是免费的。在Java中,每个线程都会分配一个独立的调用栈(默认1MB),线程的创建和销毁需要操作系统参与,涉及系统调用和内存分配。当线程数量达到数千时,这种开销会显著影响系统性能。正是这一根本问题催生了线程池的设计——通过线程复用减少创建销毁的开销。2004年,随着JSR 166的正式发布,Java 5引入了java.util.concurrent包,ThreadPoolExecutor成为这个包中最核心的组件之一。
然而,当开发者第一次面对ThreadPoolExecutor的七个构造参数时,往往会感到困惑:为什么要有核心线程数和最大线程数的区分?为什么任务满了不直接创建新线程而是先排队?为什么默认的拒绝策略是抛异常?这些看似反直觉的设计背后,隐藏着怎样的权衡考量?
一个参数引发的生产事故
某电商系统在大促期间出现响应超时,排查发现某个异步处理线程池配置如下:
ExecutorService executor = Executors.newFixedThreadPool(200);
问题在于,这个线程池处理的是耗时约5秒的数据库批量操作,而数据库连接池只有50个连接。200个线程同时工作,却有150个在等待连接——这正是线程池配置与下游资源不匹配的典型场景。
Brian Goetz在《Java Concurrency in Practice》中给出了线程池大小的计算公式:
$$N_{threads} = N_{cpu} \times U_{cpu} \times (1 + \frac{W}{C})$$其中$W$是等待时间(Wait time),$C$是计算时间(Compute time),$\frac{W}{C}$被称为阻塞系数。对于IO密集型任务,阻塞系数可能高达10甚至100,这意味着线程数可以远超CPU核心数;而对于CPU密集型任务,阻塞系数接近0,线程数应该等于CPU核心数。
上述公式可以简化为:
$$N_{threads} = N_{cpu} \times (1 + \frac{W}{C})$$在实际应用中,还需要考虑目标CPU利用率$U_{cpu}$(取值0到1)。假设一台8核服务器,目标CPU利用率80%,IO等待时间是计算时间的9倍,那么最优线程数为:
$$N_{threads} = 8 \times 0.8 \times (1 + 9) = 64$$七个参数的设计权衡
ThreadPoolExecutor的构造函数有七个参数,每一个都有其存在的设计理由:
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
核心线程数与最大线程数的分离,本质上是一种"延迟创建"策略。核心线程在池创建后不会自动启动(除非调用prestartCoreThread()),而是在第一个任务到来时才创建。当线程数达到corePoolSize后,新任务会先进入队列排队,只有当队列满了才会创建新线程直到maximumPoolSize。这个设计的目的是:在大多数负载平稳的场景下,用少量的核心线程就能应对,避免不必要的线程创建开销。
keepAliveTime的设计则体现了资源回收的考量。当线程数超过corePoolSize时,多余的空闲线程会在keepAliveTime后被回收。可以通过allowCoreThreadTimeOut(true)让核心线程也参与回收,但这种情况较少使用——因为核心线程的存在就是为了快速响应突发流量。
workQueue的选择直接影响线程池的行为特征:
| 队列类型 | 特点 | 适用场景 |
|---|---|---|
| SynchronousQueue | 零容量,直接交接 | 高吞吐、任务处理速度快 |
| LinkedBlockingQueue | 无界队列 | 任务量可控、不允许丢失 |
| ArrayBlockingQueue | 有界队列 | 需要严格控制内存使用 |
Executors.newCachedThreadPool()使用SynchronousQueue配合Integer.MAX_VALUE作为maximumPoolSize,这意味着每个任务都会创建新线程(如果没有空闲线程)。在高负载下,这种配置会导致线程数失控,最终触发OutOfMemoryError。这也是阿里Java开发手册明确禁止使用Executors创建线程池的原因。
execute方法的执行流程
理解ThreadPoolExecutor的关键在于理解execute()方法的执行逻辑:
1. 如果当前线程数 < corePoolSize,创建新线程执行任务
2. 如果当前线程数 >= corePoolSize,将任务加入队列
3. 如果队列已满且线程数 < maximumPoolSize,创建新线程
4. 如果队列已满且线程数 >= maximumPoolSize,执行拒绝策略
这个流程看似简单,但有一个容易忽略的细节:在第2步将任务入队后,会再次检查线程池状态。如果线程池已经关闭,则从队列中移除该任务并执行拒绝策略;如果当前没有工作线程,则创建一个新线程。这是为了处理极端情况——所有工作线程都在入队后、执行前被回收或异常退出。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 线程数 < corePoolSize,创建新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 线程数 >= corePoolSize,尝试入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 入队失败,尝试创建新线程
else if (!addWorker(command, false))
reject(command); // 4. 创建失败,执行拒绝策略
}
ctl:一个 AtomicInteger 的精妙设计
ThreadPoolExecutor中有一个核心变量ctl,它将线程池运行状态和线程数打包在一个AtomicInteger中:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 运行状态存储在高3位
private static final int RUNNING = -1 << COUNT_BITS; // 111...
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000...
private static final int STOP = 1 << COUNT_BITS; // 001...
private static final int TIDYING = 2 << COUNT_BITS; // 010...
private static final int TERMINATED = 3 << COUNT_BITS; // 011...
高3位存储运行状态,低29位存储线程数。这种设计使得状态检查和线程数更新可以通过一次CAS操作原子完成,避免了额外的锁开销。
线程池的五种状态及其转换关系:
- RUNNING:接受新任务,处理队列任务
- SHUTDOWN:不接受新任务,但处理队列任务(调用shutdown()后)
- STOP:不接受新任务,不处理队列任务,中断正在执行的任务(调用shutdownNow()后)
- TIDYING:所有任务已终止,workerCount为0,即将执行terminated()钩子
- TERMINATED:terminated()执行完成
stateDiagram-v2
[*] --> RUNNING
RUNNING --> SHUTDOWN: shutdown()
RUNNING --> STOP: shutdownNow()
SHUTDOWN --> TIDYING: 队列为空且workerCount=0
STOP --> TIDYING: workerCount=0
TIDYING --> TERMINATED: terminated()完成
状态只能单向转换,不会回退。RUNNING状态的值为负数(高3位全为1),这样通过runStateOf(c) < SHUTDOWN就能快速判断线程池是否仍在运行。
Worker:为什么继承 AQS
ThreadPoolExecutor的内部类Worker继承了AbstractQueuedSynchronizer,这让很多开发者感到困惑:Worker明明只是一个工作线程,为什么需要继承AQS?
答案在于中断控制。当一个任务执行时间过长,需要中断时,线程池不能简单地调用Thread.interrupt()——因为该线程可能正在执行另一个任务,或者正在等待获取队列中的任务。Worker通过AQS实现的锁机制,确保只有在线程空闲(未持有锁)时才能被中断:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
// 在构造时禁止中断,直到runWorker开始执行
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
在runWorker方法中,每次执行任务前都会调用w.lock()获取锁,任务执行后释放锁。当线程池需要中断空闲线程时,会先尝试w.tryLock()——如果获取成功,说明线程空闲;如果失败,说明线程正在执行任务,不应被中断。
Worker的state初始值为-1,这是一个关键设计。构造函数中调用setState(-1),使得在runWorker执行第一次unlock()之前,tryAcquire()永远不会成功(因为state不为0)。这防止了线程在启动前就被意外中断。
拒绝策略:四种选择的设计考量
当线程池无法接受新任务时(队列满且线程数达到maximumPoolSize,或线程池已关闭),会调用RejectedExecutionHandler。JDK提供了四种内置策略:
AbortPolicy(默认):抛出RejectedExecutionException。这是最"安全"的策略,因为它明确告知调用者任务被拒绝,调用者可以据此进行重试或降级处理。缺点是如果调用者没有正确处理异常,可能导致任务丢失。
CallerRunsPolicy:由提交任务的线程自己执行该任务。这是一种天然的背压机制——当线程池处理不过来时,生产者被迫变成消费者,从而降低任务提交速率。但需要注意,如果任务执行时间过长,可能导致提交线程长时间阻塞,影响系统的其他功能。
DiscardPolicy:静默丢弃任务,不抛异常。适用于可以容忍任务丢失的场景,比如日志收集、监控数据上报等。但缺点是无法知道任务是否被丢弃,难以排查问题。
DiscardOldestPolicy:丢弃队列中最老的任务,然后重新提交当前任务。适用于新任务比老任务更重要的场景。但需要注意,如果老任务已经被其他任务依赖,可能导致依赖关系被破坏。
在实际生产中,更推荐自定义拒绝策略。常见的做法是将被拒绝的任务持久化到数据库或消息队列,由后台线程异步重试,确保任务不丢失。
线程回收:何时以及如何
线程回收是ThreadPoolExecutor资源管理的重要机制。核心逻辑在getTask()方法中:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int wc = workerCountOf(c);
// 是否允许回收?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 回收条件:线程数超过maximumPoolSize,或空闲超时且不需要保留最小线程数
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 允许回收则带超时poll,否则无限等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
当getTask()返回null时,Worker线程会退出。回收的条件是:
- 线程数超过maximumPoolSize(可能是因为maximumPoolSize被动态调小)
- 或者线程空闲时间超过keepAliveTime,且当前线程数超过corePoolSize(或允许核心线程超时)
这里有一个细节:如果线程因为异常而退出(completedAbruptly=true),会立即创建新线程替代;如果是正常退出(队列空闲),则会根据最小线程数决定是否替代。
线程池预热:避免冷启动延迟
默认情况下,ThreadPoolExecutor中的核心线程是"懒加载"的——只有当任务到来时才会创建。这对于冷启动敏感的场景是个问题:第一个请求可能需要等待线程创建完成后才能开始处理。
ThreadPoolExecutor提供了两个预热方法:
executor.prestartCoreThread(); // 预热一个核心线程
executor.prestartAllCoreThreads(); // 预热所有核心线程
对于延迟敏感的系统,在应用启动时调用prestartAllCoreThreads()可以消除线程创建带来的冷启动延迟。但需要注意,这会增加应用启动时间和初始内存占用。
ForkJoinPool:另一种设计思路
Java 7引入的ForkJoinPool采用了完全不同的设计思路——工作窃取(Work Stealing)。每个工作线程维护自己的任务队列,当自己的队列空时,会从其他线程的队列末尾"窃取"任务。这种设计对于分治任务(Divide and Conquer)特别有效:
// ThreadPoolExecutor: 所有线程共享一个任务队列
// ForkJoinPool: 每个线程有自己的任务队列
// ThreadPoolExecutor的任务处理
while ((task = workQueue.take()) != null) {
task.run();
}
// ForkJoinPool的任务处理
while (true) {
if (myQueue.isEmpty()) {
task = stealFromOtherQueue(); // 窃取
} else {
task = myQueue.pop(); // 从自己队列末尾取(LIFO)
}
if (task != null) task.exec();
}
ForkJoinPool适合CPU密集型的递归任务,如并行排序、矩阵运算等。对于IO密集型任务或任务之间没有依赖关系的场景,传统的ThreadPoolExecutor仍然更合适。
虚拟线程:线程池的未来?
Java 21引入的虚拟线程(Virtual Threads)重新定义了线程池的使用方式。虚拟线程由JVM调度,不直接绑定操作系统线程,创建和销毁的开销极小。官方推荐的使用方式是:
// 不需要线程池!
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
});
}
对于IO密集型任务,虚拟线程可以创建数十万甚至数百万个,而不会像平台线程那样耗尽系统资源。但对于CPU密集型任务,虚拟线程并没有优势——因为计算最终还是要由CPU执行,受限于CPU核心数。
虚拟线程的引入并不意味着线程池将退出历史舞台。对于需要严格控制资源使用的场景(如数据库连接池访问),或者需要任务排队和背压的场景,ThreadPoolExecutor仍然不可或缺。
监控与调优
ThreadPoolExecutor提供了多个监控指标:
executor.getPoolSize(); // 当前线程数
executor.getActiveCount(); // 正在执行任务的线程数
executor.getQueue().size(); // 队列中等待的任务数
executor.getCompletedTaskCount(); // 已完成的任务数
executor.getLargestPoolSize(); // 历史最大线程数(用于检测是否有突发流量)
这些指标可以暴露给监控系统,用于告警和容量规划。一个健康的线程池应该满足:
getActiveCount()在峰值时接近但不超过getMaximumPoolSize()getQueue().size()不会持续增长getLargestPoolSize()不会频繁接近getMaximumPoolSize()
动态调整是线程池调优的重要手段。ThreadPoolExecutor支持运行时修改corePoolSize、maximumPoolSize、keepAliveTime等参数:
executor.setCorePoolSize(newCoreSize);
executor.setMaximumPoolSize(newMaxSize);
结合监控数据,可以实现自适应的线程池调整——当队列持续积压时增大线程数,当线程大部分空闲时减小线程数。
常见陷阱
陷阱一:Executors工厂方法的隐患
// FixedThreadPool使用无界队列,可能导致OOM
Executors.newFixedThreadPool(10);
// CachedThreadPool允许无限创建线程,可能导致OOM
Executors.newCachedThreadPool();
// 正确做法:手动创建,指定有界队列
new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000));
陷阱二:任务异常被静默吞掉
ThreadPoolExecutor的任务如果抛出未捕获异常,Worker线程会退出,但异常信息不会被记录。正确的做法是:
// 方式一:覆盖afterExecute
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
log.error("Task failed", t);
}
}
// 方式二:提交时使用submit,然后处理Future.get()的异常
Future<?> future = executor.submit(task);
try {
future.get();
} catch (ExecutionException e) {
log.error("Task failed", e.getCause());
}
陷阱三:忘记关闭线程池
线程池中的线程通常是非守护线程,如果忘记调用shutdown(),JVM不会退出。推荐的做法是:
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}));
参考资料
- Doug Lea. Concurrent Programming in Java: Design Principles and Patterns. Addison-Wesley, 1999.
- Brian Goetz et al. Java Concurrency in Practice. Addison-Wesley, 2006.
- JSR 166: Concurrency Utilities. https://jcp.org/en/jsr/detail?id=166
- ThreadPoolExecutor Source Code. OpenJDK 21.
- Zalando Engineering. How to set an ideal thread pool size. 2019.
- Oracle. ThreadPoolExecutor Java Documentation. Java SE 8.