理解美团二面中线程池拒绝策略的最佳选择:如何选择CallerRunsPolicy以防止任务丢失与避免阻塞的风险
这个面试问题非常有趣,虽然答案看似简单,但我们可以深入探讨一下CallerRunsPolicy拒绝策略所面临的潜在风险以及相关解决方案。
下面是详细内容。
线程池的拒绝策略分类
当当前活动线程总数达到最大限制并且任务队列已满时,ThreadPoolExecutor
会采取以下几种拒绝策略:
- AbortPolicy:抛出
RejectedExecutionException
来拒绝新的任务处理。 - CallerRunsPolicy:由调用者的线程来运行被拒绝的任务,也就是说这些任务会直接在调用
execute
方法的线程中执行。如果执行器已经关闭,则任务将被丢弃。虽然这种策略可能导致新任务提交的速度减慢,从而影响整体性能,但如果应用程序能够接受这种延迟,并且希望确保每一个任务请求都被处理,可以选择此策略。 - DiscardPolicy:忽略新任务,直接丢弃它们。
- DiscardOldestPolicy:丢弃最早的未处理任务请求。
例如,当我们通过ThreadPoolTaskExecutor
或直接使用ThreadPoolExecutor
构造函数创建线程池时,若不指定RejectedExecutionHandler
拒绝策略,则默认使用的是AbortPolicy
。在这种情况下,如果队列已满,ThreadPoolExecutor
会抛出RejectedExecutionException
异常,意味着您将失去对该任务的处理。如果希望保留任务,则可以选择CallerRunsPolicy
。与其他策略不同,CallerRunsPolicy
既不会丢弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 直接在主线程中执行,而不是在线程池中执行
r.run();
}
}
}
如果不允许丢弃任务,应该选择哪个拒绝策略?
通过上述对线程池拒绝策略的分析,我们得出的结论是:CallerRunsPolicy
是一个合适的选择。
让我们结合CallerRunsPolicy
的源码进行分析:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 只要程序未关闭,使用调用execute方法的线程执行该任务
if (!e.isShutdown()) {
r.run();
}
}
}
从源码来看,只要程序未关闭,任务将由执行execute
方法的线程来处理。
CallerRunsPolicy拒绝策略的风险及其解决方案
如前所述,如果希望确保每个任务请求都被执行,选择CallerRunsPolicy
是最合适的。
但是,如果进入CallerRunsPolicy
的任务是耗时的,并且提交线程是主线程,可能会导致主线程被阻塞,从而影响程序的正常运行。
举个例子,假设线程池的最大线程数为2,阻塞队列大小为1(这意味着第四个任务将触发拒绝策略),以下是使用Hutool的ThreadUtil
提供的一个简单示例:
Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);
// 创建线程池,核心线程数1,最大线程数2
// 超过核心线程数的空闲线程存活最长时间为60秒
// 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
2,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
// 提交第一个任务,由核心线程执行
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第一个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第二个任务,由于核心线程被占用,任务进入队列
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理入队的第二个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
threadPoolExecutor.execute(() -> {
log.info("非核心线程处理第三个任务");
ThreadUtil.sleep(1, TimeUnit.MINUTES);
});
// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,
// 根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
threadPoolExecutor.execute(() -> {
log.info("主线程处理第四个任务");
ThreadUtil.sleep(2, TimeUnit.MINUTES);
});
// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
threadPoolExecutor.execute(() -> {
log.info("核心线程执行第五个任务");
});
输出结果为:
18:19:48.203 INFO [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务
18:19:48.203 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务
18:19:48.203 INFO [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务
18:20:48.212 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务
18:21:48.219 INFO [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务
从输出结果可以看出,由于CallerRunsPolicy
的拒绝策略,耗时任务在主线程中执行,导致线程池被阻塞,从而影响了后续任务的及时处理,最严重的情况甚至可能导致内存溢出(OOM)。
为了解决这一问题,我们可以考虑增加阻塞队列的大小,并调整堆内存,以便能够容纳更多的任务,确保任务能够顺利执行。为了更充分地利用CPU资源,我们还可以调整线程池的maximumPoolSize
(最大线程数)参数,以提高任务处理速度,并避免阻塞队列中任务数量过多导致的内存不足。
调整阻塞队列大小和最大线程数
当服务器资源达到可利用的极限时,我们必须考虑在设计策略上改变线程池的调度。我们都知道,导致主线程卡死的根本原因是我们不希望任何任务被丢弃。那么,是否有方法既能保证任务不被丢弃,又能在服务器有空闲时及时处理?
我们可以采用任务持久化的思路,这种思路包括但不限于:
- 设计一个任务表,将任务存储到MySQL数据库中。
- 使用Redis缓存任务。
- 将任务提交到消息队列中。
以下是方案一的简单实现逻辑:
- 实现
RejectedExecutionHandler
接口,自定义拒绝策略,负责将当前线程池无法处理的任务(此时阻塞队列已满)入库(保存到MySQL中)。需要注意的是,线程池暂时无法处理的任务会先进入阻塞队列,只有在阻塞队列满时才会触发拒绝策略。 - 继承
BlockingQueue
,实现一个混合式阻塞队列,该队列结合了JDK自带的ArrayBlockingQueue
。此外,混合式阻塞队列需要重写take()
方法,在取任务时,优先从数据库中读取最早的任务,只有在数据库中无任务时,才从ArrayBlockingQueue
中取任务。
将一部分任务保存到MySQL中
整个实现逻辑相对简单,关键在于自定义拒绝策略和阻塞队列。通过这种方式,一旦线程池达到满载时,我们可以通过拒绝策略将新任务持久化到MySQL数据库中,待线程池有余力处理所有任务时,优先处理数据库中的任务,从而避免"饥饿"问题。
当然,我们也可以借鉴其他主流框架的做法,例如Netty的拒绝策略是直接创建一个线程池外的线程来处理这些任务。为了保证任务的实时处理,这种做法可能需要良好的硬件支持,而临时创建的线程则难以做到精确的监控:
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 创建一个临时线程处理任务
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
ActiveMQ则尝试在一定的时间内尽可能将任务入队,以保证最大交付:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
// 限时阻塞等待,实现尽可能交付
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
};