线程创建从来不是免费的。在Java中,每个线程都会分配一个独立的调用栈(默认1MB),线程的创建和销毁需要操作系统参与,涉及系统调用和内存分配。当线程数量达到数千时,这种开销会显著影响系统性能。正是这一根本问题催生了线程池的设计——通过线程复用减少创建销毁的开销。2004年,随着JSR 166的正式发布,Java 5引入了java.util.concurrent包,ThreadPoolExecutor成为这个包中最核心的组件之一。

然而,当开发者第一次面对ThreadPoolExecutor的七个构造参数时,往往会感到困惑:为什么要有核心线程数和最大线程数的区分?为什么任务满了不直接创建新线程而是先排队?为什么默认的拒绝策略是抛异常?这些看似反直觉的设计背后,隐藏着怎样的权衡考量?

一个参数引发的生产事故

某电商系统在大促期间出现响应超时,排查发现某个异步处理线程池配置如下:

ExecutorService executor = Executors.newFixedThreadPool(200);

问题在于,这个线程池处理的是耗时约5秒的数据库批量操作,而数据库连接池只有50个连接。200个线程同时工作,却有150个在等待连接——这正是线程池配置与下游资源不匹配的典型场景。

Brian Goetz在《Java Concurrency in Practice》中给出了线程池大小的计算公式:

$$N_{threads} = N_{cpu} \times U_{cpu} \times (1 + \frac{W}{C})$$

其中$W$是等待时间(Wait time),$C$是计算时间(Compute time),$\frac{W}{C}$被称为阻塞系数。对于IO密集型任务,阻塞系数可能高达10甚至100,这意味着线程数可以远超CPU核心数;而对于CPU密集型任务,阻塞系数接近0,线程数应该等于CPU核心数。

上述公式可以简化为:

$$N_{threads} = N_{cpu} \times (1 + \frac{W}{C})$$

在实际应用中,还需要考虑目标CPU利用率$U_{cpu}$(取值0到1)。假设一台8核服务器,目标CPU利用率80%,IO等待时间是计算时间的9倍,那么最优线程数为:

$$N_{threads} = 8 \times 0.8 \times (1 + 9) = 64$$

七个参数的设计权衡

ThreadPoolExecutor的构造函数有七个参数,每一个都有其存在的设计理由:

public ThreadPoolExecutor(
    int corePoolSize,           // 核心线程数
    int maximumPoolSize,        // 最大线程数
    long keepAliveTime,         // 空闲线程存活时间
    TimeUnit unit,              // 时间单位
    BlockingQueue<Runnable> workQueue,  // 任务队列
    ThreadFactory threadFactory, // 线程工厂
    RejectedExecutionHandler handler    // 拒绝策略
)

核心线程数与最大线程数的分离,本质上是一种"延迟创建"策略。核心线程在池创建后不会自动启动(除非调用prestartCoreThread()),而是在第一个任务到来时才创建。当线程数达到corePoolSize后,新任务会先进入队列排队,只有当队列满了才会创建新线程直到maximumPoolSize。这个设计的目的是:在大多数负载平稳的场景下,用少量的核心线程就能应对,避免不必要的线程创建开销

keepAliveTime的设计则体现了资源回收的考量。当线程数超过corePoolSize时,多余的空闲线程会在keepAliveTime后被回收。可以通过allowCoreThreadTimeOut(true)让核心线程也参与回收,但这种情况较少使用——因为核心线程的存在就是为了快速响应突发流量。

workQueue的选择直接影响线程池的行为特征:

队列类型 特点 适用场景
SynchronousQueue 零容量,直接交接 高吞吐、任务处理速度快
LinkedBlockingQueue 无界队列 任务量可控、不允许丢失
ArrayBlockingQueue 有界队列 需要严格控制内存使用

Executors.newCachedThreadPool()使用SynchronousQueue配合Integer.MAX_VALUE作为maximumPoolSize,这意味着每个任务都会创建新线程(如果没有空闲线程)。在高负载下,这种配置会导致线程数失控,最终触发OutOfMemoryError。这也是阿里Java开发手册明确禁止使用Executors创建线程池的原因。

execute方法的执行流程

理解ThreadPoolExecutor的关键在于理解execute()方法的执行逻辑:

1. 如果当前线程数 < corePoolSize,创建新线程执行任务
2. 如果当前线程数 >= corePoolSize,将任务加入队列
3. 如果队列已满且线程数 < maximumPoolSize,创建新线程
4. 如果队列已满且线程数 >= maximumPoolSize,执行拒绝策略

这个流程看似简单,但有一个容易忽略的细节:在第2步将任务入队后,会再次检查线程池状态。如果线程池已经关闭,则从队列中移除该任务并执行拒绝策略;如果当前没有工作线程,则创建一个新线程。这是为了处理极端情况——所有工作线程都在入队后、执行前被回收或异常退出。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    
    // 1. 线程数 < corePoolSize,创建新线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    // 2. 线程数 >= corePoolSize,尝试入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 入队失败,尝试创建新线程
    else if (!addWorker(command, false))
        reject(command);  // 4. 创建失败,执行拒绝策略
}

ctl:一个 AtomicInteger 的精妙设计

ThreadPoolExecutor中有一个核心变量ctl,它将线程池运行状态和线程数打包在一个AtomicInteger中:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;  // 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 运行状态存储在高3位
private static final int RUNNING    = -1 << COUNT_BITS;  // 111...
private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 000...
private static final int STOP       =   1 << COUNT_BITS;  // 001...
private static final int TIDYING    =  2 << COUNT_BITS;  // 010...
private static final int TERMINATED =   3 << COUNT_BITS;  // 011...

高3位存储运行状态,低29位存储线程数。这种设计使得状态检查和线程数更新可以通过一次CAS操作原子完成,避免了额外的锁开销。

线程池的五种状态及其转换关系:

  • RUNNING:接受新任务,处理队列任务
  • SHUTDOWN:不接受新任务,但处理队列任务(调用shutdown()后)
  • STOP:不接受新任务,不处理队列任务,中断正在执行的任务(调用shutdownNow()后)
  • TIDYING:所有任务已终止,workerCount为0,即将执行terminated()钩子
  • TERMINATED:terminated()执行完成
stateDiagram-v2
    [*] --> RUNNING
    RUNNING --> SHUTDOWN: shutdown()
    RUNNING --> STOP: shutdownNow()
    SHUTDOWN --> TIDYING: 队列为空且workerCount=0
    STOP --> TIDYING: workerCount=0
    TIDYING --> TERMINATED: terminated()完成

状态只能单向转换,不会回退。RUNNING状态的值为负数(高3位全为1),这样通过runStateOf(c) < SHUTDOWN就能快速判断线程池是否仍在运行。

Worker:为什么继承 AQS

ThreadPoolExecutor的内部类Worker继承了AbstractQueuedSynchronizer,这让很多开发者感到困惑:Worker明明只是一个工作线程,为什么需要继承AQS?

答案在于中断控制。当一个任务执行时间过长,需要中断时,线程池不能简单地调用Thread.interrupt()——因为该线程可能正在执行另一个任务,或者正在等待获取队列中的任务。Worker通过AQS实现的锁机制,确保只有在线程空闲(未持有锁)时才能被中断:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    
    final Thread thread;
    Runnable firstTask;
    
    Worker(Runnable firstTask) {
        // 在构造时禁止中断,直到runWorker开始执行
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
        runWorker(this);
    }
    
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

在runWorker方法中,每次执行任务前都会调用w.lock()获取锁,任务执行后释放锁。当线程池需要中断空闲线程时,会先尝试w.tryLock()——如果获取成功,说明线程空闲;如果失败,说明线程正在执行任务,不应被中断。

Worker的state初始值为-1,这是一个关键设计。构造函数中调用setState(-1),使得在runWorker执行第一次unlock()之前,tryAcquire()永远不会成功(因为state不为0)。这防止了线程在启动前就被意外中断。

拒绝策略:四种选择的设计考量

当线程池无法接受新任务时(队列满且线程数达到maximumPoolSize,或线程池已关闭),会调用RejectedExecutionHandler。JDK提供了四种内置策略:

AbortPolicy(默认):抛出RejectedExecutionException。这是最"安全"的策略,因为它明确告知调用者任务被拒绝,调用者可以据此进行重试或降级处理。缺点是如果调用者没有正确处理异常,可能导致任务丢失。

CallerRunsPolicy:由提交任务的线程自己执行该任务。这是一种天然的背压机制——当线程池处理不过来时,生产者被迫变成消费者,从而降低任务提交速率。但需要注意,如果任务执行时间过长,可能导致提交线程长时间阻塞,影响系统的其他功能。

DiscardPolicy:静默丢弃任务,不抛异常。适用于可以容忍任务丢失的场景,比如日志收集、监控数据上报等。但缺点是无法知道任务是否被丢弃,难以排查问题。

DiscardOldestPolicy:丢弃队列中最老的任务,然后重新提交当前任务。适用于新任务比老任务更重要的场景。但需要注意,如果老任务已经被其他任务依赖,可能导致依赖关系被破坏。

在实际生产中,更推荐自定义拒绝策略。常见的做法是将被拒绝的任务持久化到数据库或消息队列,由后台线程异步重试,确保任务不丢失。

线程回收:何时以及如何

线程回收是ThreadPoolExecutor资源管理的重要机制。核心逻辑在getTask()方法中:

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int wc = workerCountOf(c);
        
        // 是否允许回收?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 回收条件:线程数超过maximumPoolSize,或空闲超时且不需要保留最小线程数
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        
        try {
            // 允许回收则带超时poll,否则无限等待
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

当getTask()返回null时,Worker线程会退出。回收的条件是:

  1. 线程数超过maximumPoolSize(可能是因为maximumPoolSize被动态调小)
  2. 或者线程空闲时间超过keepAliveTime,且当前线程数超过corePoolSize(或允许核心线程超时)

这里有一个细节:如果线程因为异常而退出(completedAbruptly=true),会立即创建新线程替代;如果是正常退出(队列空闲),则会根据最小线程数决定是否替代。

线程池预热:避免冷启动延迟

默认情况下,ThreadPoolExecutor中的核心线程是"懒加载"的——只有当任务到来时才会创建。这对于冷启动敏感的场景是个问题:第一个请求可能需要等待线程创建完成后才能开始处理。

ThreadPoolExecutor提供了两个预热方法:

executor.prestartCoreThread();      // 预热一个核心线程
executor.prestartAllCoreThreads();  // 预热所有核心线程

对于延迟敏感的系统,在应用启动时调用prestartAllCoreThreads()可以消除线程创建带来的冷启动延迟。但需要注意,这会增加应用启动时间和初始内存占用。

ForkJoinPool:另一种设计思路

Java 7引入的ForkJoinPool采用了完全不同的设计思路——工作窃取(Work Stealing)。每个工作线程维护自己的任务队列,当自己的队列空时,会从其他线程的队列末尾"窃取"任务。这种设计对于分治任务(Divide and Conquer)特别有效:

// ThreadPoolExecutor: 所有线程共享一个任务队列
// ForkJoinPool: 每个线程有自己的任务队列

// ThreadPoolExecutor的任务处理
while ((task = workQueue.take()) != null) {
    task.run();
}

// ForkJoinPool的任务处理
while (true) {
    if (myQueue.isEmpty()) {
        task = stealFromOtherQueue();  // 窃取
    } else {
        task = myQueue.pop();  // 从自己队列末尾取(LIFO)
    }
    if (task != null) task.exec();
}

ForkJoinPool适合CPU密集型的递归任务,如并行排序、矩阵运算等。对于IO密集型任务或任务之间没有依赖关系的场景,传统的ThreadPoolExecutor仍然更合适。

虚拟线程:线程池的未来?

Java 21引入的虚拟线程(Virtual Threads)重新定义了线程池的使用方式。虚拟线程由JVM调度,不直接绑定操作系统线程,创建和销毁的开销极小。官方推荐的使用方式是:

// 不需要线程池!
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    IntStream.range(0, 10000).forEach(i -> {
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1));
            return i;
        });
    });
}

对于IO密集型任务,虚拟线程可以创建数十万甚至数百万个,而不会像平台线程那样耗尽系统资源。但对于CPU密集型任务,虚拟线程并没有优势——因为计算最终还是要由CPU执行,受限于CPU核心数。

虚拟线程的引入并不意味着线程池将退出历史舞台。对于需要严格控制资源使用的场景(如数据库连接池访问),或者需要任务排队和背压的场景,ThreadPoolExecutor仍然不可或缺。

监控与调优

ThreadPoolExecutor提供了多个监控指标:

executor.getPoolSize();         // 当前线程数
executor.getActiveCount();     // 正在执行任务的线程数
executor.getQueue().size();    // 队列中等待的任务数
executor.getCompletedTaskCount(); // 已完成的任务数
executor.getLargestPoolSize(); // 历史最大线程数(用于检测是否有突发流量)

这些指标可以暴露给监控系统,用于告警和容量规划。一个健康的线程池应该满足:

  • getActiveCount()在峰值时接近但不超过getMaximumPoolSize()
  • getQueue().size()不会持续增长
  • getLargestPoolSize()不会频繁接近getMaximumPoolSize()

动态调整是线程池调优的重要手段。ThreadPoolExecutor支持运行时修改corePoolSizemaximumPoolSizekeepAliveTime等参数:

executor.setCorePoolSize(newCoreSize);
executor.setMaximumPoolSize(newMaxSize);

结合监控数据,可以实现自适应的线程池调整——当队列持续积压时增大线程数,当线程大部分空闲时减小线程数。

常见陷阱

陷阱一:Executors工厂方法的隐患

// FixedThreadPool使用无界队列,可能导致OOM
Executors.newFixedThreadPool(10);

// CachedThreadPool允许无限创建线程,可能导致OOM
Executors.newCachedThreadPool();

// 正确做法:手动创建,指定有界队列
new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000));

陷阱二:任务异常被静默吞掉

ThreadPoolExecutor的任务如果抛出未捕获异常,Worker线程会退出,但异常信息不会被记录。正确的做法是:

// 方式一:覆盖afterExecute
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (t != null) {
        log.error("Task failed", t);
    }
}

// 方式二:提交时使用submit,然后处理Future.get()的异常
Future<?> future = executor.submit(task);
try {
    future.get();
} catch (ExecutionException e) {
    log.error("Task failed", e.getCause());
}

陷阱三:忘记关闭线程池

线程池中的线程通常是非守护线程,如果忘记调用shutdown(),JVM不会退出。推荐的做法是:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    executor.shutdown();
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
    }
}));

参考资料

  1. Doug Lea. Concurrent Programming in Java: Design Principles and Patterns. Addison-Wesley, 1999.
  2. Brian Goetz et al. Java Concurrency in Practice. Addison-Wesley, 2006.
  3. JSR 166: Concurrency Utilities. https://jcp.org/en/jsr/detail?id=166
  4. ThreadPoolExecutor Source Code. OpenJDK 21.
  5. Zalando Engineering. How to set an ideal thread pool size. 2019.
  6. Oracle. ThreadPoolExecutor Java Documentation. Java SE 8.