这个面试问题很有趣,虽然答案看似简单,但我们可以进一步探讨CallerRunsPolicy
拒绝策略的潜在风险以及相应的解决方案。
线程池拒绝策略的种类
当同时运行的线程数量达到最大限制时,并且队列已满,ThreadPoolExecutor
提供了一些拒绝策略:
- AbortPolicy:会抛出
RejectedExecutionException
,拒绝处理新的任务。 - CallerRunsPolicy:允许调用线程直接运行被拒绝的任务。也就是说,调用
execute
方法的线程会执行被拒绝的任务。如果执行器已关闭,则该任务会被丢弃。这种策略可能会降低新任务的提交速度,从而影响程序的整体性能。如果应用程序能够承受这种延迟,并且希望每个任务都被执行,您可以选择此策略。 - DiscardPolicy:直接丢弃新提交的任务,不做处理。
- DiscardOldestPolicy:会丢弃最早的未处理任务请求。
例如,当使用ThreadPoolTaskExecutor
或直接通过ThreadPoolExecutor
构造线程池时,如果不指定RejectedExecutionHandler
拒绝策略,默认使用AbortPolicy
。在这种情况下,如队列已满,ThreadPoolExecutor
将抛出RejectedExecutionException
,导致任务被丢弃。为了避免这种情况,您可以选择使用CallerRunsPolicy
。与其他策略不同,CallerRunsPolicy
不会丢弃任务或抛出异常,而是将任务回退给调用者,由调用线程来执行这些任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 直接在主线程中执行,而不是在线程池中执行
r.run();
}
}
}
不允许丢弃任务的情况下,应选择哪个拒绝策略?
根据上述线程池拒绝策略的介绍,显而易见答案是:CallerRunsPolicy
。
结合CallerRunsPolicy
的源代码,我们可以看到:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 只要程序未关闭,使用调用execute方法的线程执行该任务
if (!e.isShutdown()) {
r.run();
}
}
}
这段代码表明,只要程序未关闭,就会使用调用者的线程来执行被拒绝的任务。
CallerRunsPolicy拒绝策略的风险和解决方案
如前所述,选择CallerRunsPolicy
拒绝策略适合需要确保所有任务请求都能被执行的情况。
然而,如果被拒绝的任务是一个耗时操作,而提交任务的线程是主线程,这可能会导致主线程被阻塞,从而影响程序的正常运行。
举个例子,假设线程池最大线程数为2,阻塞队列大小为1(这意味着第四个任务会触发拒绝策略),代码如下:
Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
// 提交第一个任务,由核心线程执行
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第一个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第二个任务,由于核心线程被占用,任务将进入队列等待
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理入队的第二个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理第三个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由主线程来执行
threadPoolExecutor.execute(() -> {
log.info("主线程处理第四个任务");
ThreadUtil.sleep(2, TimeUnit.MINUTES);
});
// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第五个任务");
});
输出结果如下:
18:19:48.203 INFO [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务
18:19:48.203 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务
18:19:48.203 INFO [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务
18:20:48.212 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务
18:21:48.219 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务
从输出结果可见,由于CallerRunsPolicy
的拒绝策略,耗时任务在主线程中执行,导致线程池阻塞,从而阻碍后续任务的及时执行,严重时可能会引发内存溢出(OOM)。
为了从根本上解决这个问题,调用者希望通过CallerRunsPolicy
确保所有任务都能被执行,但当内存足够时,可以考虑增加阻塞队列的大小并调整堆内存,以容纳更多任务,保证任务的有效执行。
为充分利用CPU资源,还可以调整线程池的maximumPoolSize
(最大线程数)参数,以提升任务处理速度,避免在BlockingQueue
中积压过多任务导致内存耗尽。
调整阻塞队列大小和最大线程数
当服务器资源达到上限时,就需要重新考虑线程池的调度策略。阻塞主线程的原因是我们希望确保每个任务都不被丢弃。那么,有没有一种方法既能保证任务不被丢弃,同时在服务器有余力时及时处理呢?
一种建议是任务持久化,可以通过以下方式实现:
- 设计一个任务表,将任务存储到MySQL数据库中。
- 使用Redis缓存任务。
- 将任务提交到消息队列中。
以方案一为例,简单介绍实现逻辑:
- 实现
RejectedExecutionHandler
接口,定义自定义拒绝策略,该策略负责将线程池暂时无法处理的任务存储在MySQL中。需要注意的是,任务会先进入阻塞队列,只有在队列满时才会触发拒绝策略。 - 继承
BlockingQueue
,实现一个混合式阻塞队列,该队列包含JDK
自带的ArrayBlockingQueue
,并重写take()
方法,优先从数据库读取最早的任务,数据库中无任务时再从ArrayBlockingQueue
中取任务。
将部分任务保存到MySQL中
整个实现逻辑相对简单,核心在于自定义拒绝策略和阻塞队列。当线程池满载时,可以通过拒绝策略将新任务持久化到MySQL中,待线程池有余力时,优先处理数据库中的任务,避免出现“饥饿”问题。
当然,我们也可以借鉴其他主流框架的做法。例如,Netty的拒绝策略是创建一个独立于线程池的线程来处理这些任务,这种做法需要良好的硬件设备,并且临时创建的线程难以进行精确监控:
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 创建一个临时线程处理任务
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
ActiveMQ则尝试在指定时限内将任务尽可能地入队,以确保最大化交付:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
// 限时阻塞等待,尽可能交付
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
};