Skip to content

线程池

为什么要使⽤线程池?直接new个线程不是很舒服?

如果我们在⽅法中直接new⼀个线程来处理,当这个⽅法被调⽤频繁时就会创建很多线程,不仅会消耗系统资源,还会降低系统的稳定性。

总的来说线程池有如下好处:

  • 降低资源消耗。通过重复利⽤已创建的线程,降低线程创建和销毁造成的消耗。
  • 提⾼响应速度。当任务到达时,任务可以不需要等到线程创建就能⽴即执⾏。
  • 增加线程的可管理型。线程是稀缺资源,使⽤线程池可以进⾏统⼀分配,调优和监控。

线程池的核⼼属性有哪些

corePoolSize(核⼼线程数):当线程池运⾏的线程少于 corePoolSize 时,将创建⼀个新线程来处理请求,即使其他⼯作线程处于空闲状态。

maximumPoolSize(最⼤线程数):线程池允许开启的最⼤线程数。

handler(拒绝策略):往线程池添加任务时,将在下⾯两种情况触发拒绝策略:1)线程池运⾏状态不是 RUNNING;2)线程池已经达到最⼤线程数,并且阻塞队列已满时。

keepAliveTime(保持存活时间):如果线程池当前线程数超过 corePoolSize,则多余的线程空闲时间超过 keepAliveTime 时会被终⽌。

workQueue(队列):⽤于保留任务并移交给⼯作线程的阻塞队列。

threadFactory(线程⼯⼚):⽤于创建⼯作线程的⼯⼚。

java
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

线程池工作流程

image-20250920170913282

线程池源代码
java
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

线程池有⼏种状态,每个状态分别代表什么含义?

线程池⽬前有5个状态:

  • RUNNING:接受新任务并处理排队的任务。
  • SHUTDOWN:不接受新任务,但处理排队的任务。
  • STOP:不接受新任务,不处理排队的任务,并中断正在进⾏的任务。
  • TIDYING:所有任务都已终⽌,workerCount 为零,线程转换到 TIDYING 状态将运⾏terminated() 钩⼦⽅法
  • TERMINATED:terminated() 已完成。

线程池有哪些队列?

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按先进先出对元素进⾏排序。

  • LinkedBlockingQueue:基于链表结构的有界/⽆界阻塞队列,按先进先出对元素进⾏排序,吞吐通常⾼于 ArrayBlockingQueue。Executors.newFixedThreadPool 使⽤了该队列。

  • SynchronousQueue:不是⼀个真正的队列,⽽是⼀种在线程之间移交的机制。要将⼀个元素放⼊SynchronousQueue 中,必须有另⼀个线程正在等待接受这个元素。如果没有线程等待,并且线程池的当前⼤⼩⼩于最⼤值,那么线程池将创建⼀个线程,否则根据拒绝策略,这个任务将被拒绝。使⽤直接移交将更⾼效,因为任务会直接移交给执⾏它的线程,⽽不是被放在队列中,然后由⼯作线程从队列中提取任务。只有当线程池是⽆界的或者可以拒绝任务时,该队列才有实际价值。

    Executors.newCachedThreadPool使⽤了该队列。其最大线程是Integer的最大值。所以使用该队列就非常高效的给线程池执行

  • PriorityBlockingQueue:具有优先级的⽆界队列,按优先级对元素进⾏排序。元素的优先级是通过⾃然顺序或 Comparator 来定义的。

使⽤队列有什么需要注意的吗?

使⽤有界队列时,需要注意线程池满了后,被拒绝的任务如何处理。

使⽤⽆界队列时,需要注意如果任务的提交速度⼤于线程池的处理速度,可能会导致内存溢出。

线程池有哪些拒绝策略?

AbortPolicy:中⽌策略。默认的拒绝策略,直接抛出 RejectedExecutionException。调⽤者可以捕获这个异常,然后根据需求编写⾃⼰的处理代码。

DiscardPolicy:抛弃策略。什么都不做,直接抛弃被拒绝的任务。

DiscardOldestPolicy:抛弃最⽼策略。抛弃阻塞队列中最⽼的任务,相当于就是队列中下⼀个将要被执⾏的任务,然后重新提交被拒绝的任务。如果阻塞队列是⼀个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最⾼的任务,因此最好不要将该策略和优先级队列放在⼀起使⽤。

CallerRunsPolicy:调⽤者运⾏策略。在调⽤者线程中执⾏该任务。该策略实现了⼀种调节机制,该策略既不会抛弃任务,也不会抛出异常,⽽是将任务回退到调⽤者(调⽤线程池执⾏任务的主线程),由于执⾏任务需要⼀定时间,因此主线程⾄少在⼀段时间内不能提交任务,从⽽使得线程池有时间来处理完正在执⾏的任务。

非核心线程能成为核心线程吗?线程池如何阻塞的?非核心线程如何实现在 keepAliveTime 后死亡?

image-20250920174641113

线程池中无核心线程池和非核心线程池的概念,他通过内部通过获取task来决定的。若有超时获取,则结束当前线程,若没有设置时间则一直阻塞

对于只剩核心线程,调用内部take一直阻塞获取任务

对于非核心线程,调用poll方法(time)获取,若没有则推出线程

getTask源代码
java
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如何终⽌线程池?

终⽌线程池主要有两种⽅式:

shutdown:“温柔”的关闭线程池。不接受新任务,但是在关闭前会将之前提交的任务处理完毕。

shutdownNow:“粗暴”的关闭线程池,也就是直接关闭线程池,通过 Thread#interrupt() ⽅法终⽌所有线程,不会等待之前提交的任务执⾏完毕。但是会返回队列中未处理的任务。

Executors 提供了哪些创建线程池的⽅法?

newFixedThreadPool:固定线程数的线程池。corePoolSize = maximumPoolSize,keepAliveTime为0,⼯作队列使⽤⽆界的LinkedBlockingQueue。适⽤于为了满⾜资源管理的需求,⽽需要限制当前线程数量的场景,适⽤于负载⽐较重的服务器。

newSingleThreadExecutor:只有⼀个线程的线程池。corePoolSize = maximumPoolSize = 1,keepAliveTime为0, ⼯作队列使⽤⽆界的LinkedBlockingQueue。适⽤于需要保证顺序的执⾏各个任务的场景。

newCachedThreadPool: 按需要创建新线程的线程池。核⼼线程数为0,最⼤线程数为Integer.MAX_VALUE,keepAliveTime为60秒,⼯作队列使⽤同步移交 SynchronousQueue。该线程池可以⽆限扩展,当需求增加时,可以添加新的线程,⽽当需求降低时会⾃动回收空闲线程。适⽤于执⾏很多的短期异步任务,或者是负载较轻的服务器。

newScheduledThreadPool:创建⼀个以延迟或定时的⽅式来执⾏任务的线程池,⼯作队列为DelayedWorkQueue。适⽤于需要多个后台线程执⾏周期任务。

newWorkStealingPool:JDK 1.8 新增,⽤于创建⼀个可以窃取的线程池,底层使⽤ ForkJoinPool实现。

线程池⾥有个 ctl,你知道它是如何设计的吗?

ctl 是⼀个打包两个概念字段的原⼦整数。

1)workerCount:指⽰线程的有效数量;

2)runState:指⽰线程池的运⾏状态,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 等状态。

int 类型有32位,其中 ctl 的低29为⽤于表⽰ workerCount,⾼3位⽤于表⽰ runState,如下图所⽰。

image-20250920180959744

当我们的线程池运⾏状态为 RUNNING,⼯作线程个数为3,则此时 ctl 的原码为:1010 0000 0000 0000 0000 0000 0000 0011

ctl 为什么这么设计?有什么好处吗?

ctl 这么设计的主要好处是将对 runState 和 workerCount 的操作封装成了⼀个原⼦操作。runState 和 workerCount 是线程池正常运转中的2个最重要属性,线程池在某⼀时刻该做什么操作,取决于这2个属性的值。

在我们实际使用中,线程池的大小配置多少合适?

对于计算密集型,设置 线程数 = CPU数 + 1,通常能实现最优的利⽤率。

对于I/O密集型,⽹上常⻅的说法是设置 线程数 = CPU数 * 2 ,这个做法是可以的,但其实并不是最优的。

一般来说:线程数 = CPU数 * CPU利⽤率 * (任务等待时间 / 任务计算时间 + 1)

例如我们有个定时任务,部署在4核的服务器上,该任务有100ms在计算,900ms在I/O等待,则线程

数约为:4 * 1 * (1 + 900 / 100) = 40个。