这个面试问题很有趣,虽然答案看似简单,但我们可以进一步探讨CallerRunsPolicy拒绝策略的潜在风险以及相应的解决方案。

下面是详细内容。

线程池拒绝策略的种类

当同时运行的线程数量达到最大限制时,并且队列已满,ThreadPoolExecutor提供了一些拒绝策略:

  • AbortPolicy:会抛出RejectedExecutionException,拒绝处理新的任务。
  • CallerRunsPolicy:允许调用线程直接运行被拒绝的任务。也就是说,调用execute方法的线程会执行被拒绝的任务。如果执行器已关闭,则该任务会被丢弃。这种策略可能会降低新任务的提交速度,从而影响程序的整体性能。如果应用程序能够承受这种延迟,并且希望每个任务都被执行,您可以选择此策略。
  • DiscardPolicy:直接丢弃新提交的任务,不做处理。
  • DiscardOldestPolicy:会丢弃最早的未处理任务请求。

例如,当使用ThreadPoolTaskExecutor或直接通过ThreadPoolExecutor构造线程池时,如果不指定RejectedExecutionHandler拒绝策略,默认使用AbortPolicy。在这种情况下,如队列已满,ThreadPoolExecutor将抛出RejectedExecutionException,导致任务被丢弃。为了避免这种情况,您可以选择使用CallerRunsPolicy。与其他策略不同,CallerRunsPolicy不会丢弃任务或抛出异常,而是将任务回退给调用者,由调用线程来执行这些任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {  
    public CallerRunsPolicy() { }  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        if (!e.isShutdown()) {  
            // 直接在主线程中执行,而不是在线程池中执行  
            r.run();  
        }  
    }  
}  

不允许丢弃任务的情况下,应选择哪个拒绝策略?

根据上述线程池拒绝策略的介绍,显而易见答案是:CallerRunsPolicy

结合CallerRunsPolicy的源代码,我们可以看到:

public static class CallerRunsPolicy implements RejectedExecutionHandler {  
    public CallerRunsPolicy() { }  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        // 只要程序未关闭,使用调用execute方法的线程执行该任务  
        if (!e.isShutdown()) {  
            r.run();  
        }  
    }  
}  

这段代码表明,只要程序未关闭,就会使用调用者的线程来执行被拒绝的任务。

CallerRunsPolicy拒绝策略的风险和解决方案

如前所述,选择CallerRunsPolicy拒绝策略适合需要确保所有任务请求都能被执行的情况。

然而,如果被拒绝的任务是一个耗时操作,而提交任务的线程是主线程,这可能会导致主线程被阻塞,从而影响程序的正常运行。

举个例子,假设线程池最大线程数为2,阻塞队列大小为1(这意味着第四个任务会触发拒绝策略),代码如下:

Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);  
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,  
        2,  
        60,  
        TimeUnit.SECONDS,  
        new ArrayBlockingQueue<>(1),  
        new ThreadPoolExecutor.CallerRunsPolicy());  

// 提交第一个任务,由核心线程执行  
threadPoolExecutor.execute(() -> {  
    log.info("核心线程执行第一个任务");  
    ThreadUtil.sleep(1, TimeUnit.MINUTES);  
});  

// 提交第二个任务,由于核心线程被占用,任务将进入队列等待  
threadPoolExecutor.execute(() -> {  
    log.info("非核心线程处理入队的第二个任务");  
    ThreadUtil.sleep(1, TimeUnit.MINUTES);  
});  

// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理  
threadPoolExecutor.execute(() -> {  
    log.info("非核心线程处理第三个任务");  
    ThreadUtil.sleep(1, TimeUnit.MINUTES);  
});  

// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由主线程来执行  
threadPoolExecutor.execute(() -> {  
    log.info("主线程处理第四个任务");  
    ThreadUtil.sleep(2, TimeUnit.MINUTES);  
});  

// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交  
threadPoolExecutor.execute(() -> {  
    log.info("核心线程执行第五个任务");  
});  

输出结果如下:

18:19:48.203 INFO  [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务  
18:19:48.203 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务  
18:19:48.203 INFO  [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务  
18:20:48.212 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务  
18:21:48.219 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务  

从输出结果可见,由于CallerRunsPolicy的拒绝策略,耗时任务在主线程中执行,导致线程池阻塞,从而阻碍后续任务的及时执行,严重时可能会引发内存溢出(OOM)。

为了从根本上解决这个问题,调用者希望通过CallerRunsPolicy确保所有任务都能被执行,但当内存足够时,可以考虑增加阻塞队列的大小并调整堆内存,以容纳更多任务,保证任务的有效执行。

为充分利用CPU资源,还可以调整线程池的maximumPoolSize(最大线程数)参数,以提升任务处理速度,避免在BlockingQueue中积压过多任务导致内存耗尽。

图片

调整阻塞队列大小和最大线程数

当服务器资源达到上限时,就需要重新考虑线程池的调度策略。阻塞主线程的原因是我们希望确保每个任务都不被丢弃。那么,有没有一种方法既能保证任务不被丢弃,同时在服务器有余力时及时处理呢?

一种建议是任务持久化,可以通过以下方式实现:

  1. 设计一个任务表,将任务存储到MySQL数据库中。
  2. 使用Redis缓存任务。
  3. 将任务提交到消息队列中。

以方案一为例,简单介绍实现逻辑:

  1. 实现RejectedExecutionHandler接口,定义自定义拒绝策略,该策略负责将线程池暂时无法处理的任务存储在MySQL中。需要注意的是,任务会先进入阻塞队列,只有在队列满时才会触发拒绝策略。
  2. 继承BlockingQueue,实现一个混合式阻塞队列,该队列包含JDK自带的ArrayBlockingQueue,并重写take()方法,优先从数据库读取最早的任务,数据库中无任务时再从ArrayBlockingQueue中取任务。

图片

将部分任务保存到MySQL中

整个实现逻辑相对简单,核心在于自定义拒绝策略和阻塞队列。当线程池满载时,可以通过拒绝策略将新任务持久化到MySQL中,待线程池有余力时,优先处理数据库中的任务,避免出现“饥饿”问题。

当然,我们也可以借鉴其他主流框架的做法。例如,Netty的拒绝策略是创建一个独立于线程池的线程来处理这些任务,这种做法需要良好的硬件设备,并且临时创建的线程难以进行精确监控:

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {  
    NewThreadRunsPolicy() {  
        super();  
    }  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
        try {  
            // 创建一个临时线程处理任务  
            final Thread t = new Thread(r, "Temporary task executor");  
            t.start();  
        } catch (Throwable e) {  
            throw new RejectedExecutionException(  
                    "Failed to start a new thread", e);  
        }  
    }  
}  

ActiveMQ则尝试在指定时限内将任务尽可能地入队,以确保最大化交付:

new RejectedExecutionHandler() {  
    @Override  
    public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {  
        try {  
            // 限时阻塞等待,尽可能交付  
            executor.getQueue().offer(r, 60, TimeUnit.SECONDS);  
        } catch (InterruptedException e) {  
            throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");  
        }  
        throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");  
    }  
};