这个面试问题非常有趣,虽然答案看似简单,但我们可以深入探讨一下CallerRunsPolicy拒绝策略所面临的潜在风险以及相关解决方案。

下面是详细内容。

线程池的拒绝策略分类

当当前活动线程总数达到最大限制并且任务队列已满时,ThreadPoolExecutor会采取以下几种拒绝策略:

  • AbortPolicy:抛出RejectedExecutionException来拒绝新的任务处理。
  • CallerRunsPolicy:由调用者的线程来运行被拒绝的任务,也就是说这些任务会直接在调用execute方法的线程中执行。如果执行器已经关闭,则任务将被丢弃。虽然这种策略可能导致新任务提交的速度减慢,从而影响整体性能,但如果应用程序能够接受这种延迟,并且希望确保每一个任务请求都被处理,可以选择此策略。
  • DiscardPolicy:忽略新任务,直接丢弃它们。
  • DiscardOldestPolicy:丢弃最早的未处理任务请求。

例如,当我们通过ThreadPoolTaskExecutor或直接使用ThreadPoolExecutor构造函数创建线程池时,若不指定RejectedExecutionHandler拒绝策略,则默认使用的是AbortPolicy。在这种情况下,如果队列已满,ThreadPoolExecutor会抛出RejectedExecutionException异常,意味着您将失去对该任务的处理。如果希望保留任务,则可以选择CallerRunsPolicy。与其他策略不同,CallerRunsPolicy既不会丢弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // 直接在主线程中执行,而不是在线程池中执行
            r.run();
        }
    }
}

如果不允许丢弃任务,应该选择哪个拒绝策略?

通过上述对线程池拒绝策略的分析,我们得出的结论是:CallerRunsPolicy是一个合适的选择。

让我们结合CallerRunsPolicy的源码进行分析:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 只要程序未关闭,使用调用execute方法的线程执行该任务
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

从源码来看,只要程序未关闭,任务将由执行execute方法的线程来处理。

CallerRunsPolicy拒绝策略的风险及其解决方案

如前所述,如果希望确保每个任务请求都被执行,选择CallerRunsPolicy是最合适的。

但是,如果进入CallerRunsPolicy的任务是耗时的,并且提交线程是主线程,可能会导致主线程被阻塞,从而影响程序的正常运行。

举个例子,假设线程池的最大线程数为2,阻塞队列大小为1(这意味着第四个任务将触发拒绝策略),以下是使用Hutool的ThreadUtil提供的一个简单示例:

Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);
// 创建线程池,核心线程数1,最大线程数2
// 超过核心线程数的空闲线程存活最长时间为60秒
// 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
    2,
    60,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1),
    new ThreadPoolExecutor.CallerRunsPolicy());

// 提交第一个任务,由核心线程执行
threadPoolExecutor.execute(() -> {
    log.info("核心线程执行第一个任务");
    ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第二个任务,由于核心线程被占用,任务进入队列
threadPoolExecutor.execute(() -> {
    log.info("非核心线程处理入队的第二个任务");
    ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
threadPoolExecutor.execute(() -> {
    log.info("非核心线程处理第三个任务");
    ThreadUtil.sleep(1, TimeUnit.MINUTES);
});

// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,
// 根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
threadPoolExecutor.execute(() -> {
    log.info("主线程处理第四个任务");
    ThreadUtil.sleep(2, TimeUnit.MINUTES);
});

// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
threadPoolExecutor.execute(() -> {
    log.info("核心线程执行第五个任务");
});

输出结果为:

18:19:48.203 INFO  [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务
18:19:48.203 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务
18:19:48.203 INFO  [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务
18:20:48.212 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务
18:21:48.219 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务

从输出结果可以看出,由于CallerRunsPolicy的拒绝策略,耗时任务在主线程中执行,导致线程池被阻塞,从而影响了后续任务的及时处理,最严重的情况甚至可能导致内存溢出(OOM)。

为了解决这一问题,我们可以考虑增加阻塞队列的大小,并调整堆内存,以便能够容纳更多的任务,确保任务能够顺利执行。为了更充分地利用CPU资源,我们还可以调整线程池的maximumPoolSize(最大线程数)参数,以提高任务处理速度,并避免阻塞队列中任务数量过多导致的内存不足。

图片

调整阻塞队列大小和最大线程数

当服务器资源达到可利用的极限时,我们必须考虑在设计策略上改变线程池的调度。我们都知道,导致主线程卡死的根本原因是我们不希望任何任务被丢弃。那么,是否有方法既能保证任务不被丢弃,又能在服务器有空闲时及时处理?

我们可以采用任务持久化的思路,这种思路包括但不限于:

  1. 设计一个任务表,将任务存储到MySQL数据库中。
  2. 使用Redis缓存任务。
  3. 将任务提交到消息队列中。

以下是方案一的简单实现逻辑:

  1. 实现RejectedExecutionHandler接口,自定义拒绝策略,负责将当前线程池无法处理的任务(此时阻塞队列已满)入库(保存到MySQL中)。需要注意的是,线程池暂时无法处理的任务会先进入阻塞队列,只有在阻塞队列满时才会触发拒绝策略。
  2. 继承BlockingQueue,实现一个混合式阻塞队列,该队列结合了JDK自带的ArrayBlockingQueue。此外,混合式阻塞队列需要重写take()方法,在取任务时,优先从数据库中读取最早的任务,只有在数据库中无任务时,才从ArrayBlockingQueue中取任务。

图片

将一部分任务保存到MySQL中

整个实现逻辑相对简单,关键在于自定义拒绝策略和阻塞队列。通过这种方式,一旦线程池达到满载时,我们可以通过拒绝策略将新任务持久化到MySQL数据库中,待线程池有余力处理所有任务时,优先处理数据库中的任务,从而避免"饥饿"问题。

当然,我们也可以借鉴其他主流框架的做法,例如Netty的拒绝策略是直接创建一个线程池外的线程来处理这些任务。为了保证任务的实时处理,这种做法可能需要良好的硬件支持,而临时创建的线程则难以做到精确的监控:

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
    NewThreadRunsPolicy() {
        super();
    }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // 创建一个临时线程处理任务
            final Thread t = new Thread(r, "Temporary task executor");
            t.start();
        } catch (Throwable e) {
            throw new RejectedExecutionException(
                "Failed to start a new thread", e);
        }
    }
}

ActiveMQ则尝试在一定的时间内尽可能将任务入队,以保证最大交付:

new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
        try {
            // 限时阻塞等待,实现尽可能交付
            executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
        }
        throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
    }
};