在面试中遇到了一个颇具挑战性的问题,未能给出满意的答案,最终导致面试未果。这个问题并不罕见,其实在考察求职者对线程池及阻塞队列的理解与应用能力。这并不意味着面试者必须设计出一个完美的优先级任务线程池,而是需要展示出对相关概念的扎实掌握。

在我们深入探讨之前,首先回顾一下线程池处理任务的基本流程。

图片

线程池实现原理图解

  1. 当当前运行的线程数量少于核心线程数时,线程池会新建线程以执行任务。
  2. 如果当前运行的线程数达到或超过核心线程数,但低于最大线程数,该任务将被添加到任务队列中,等待执行。
  3. 在任务队列满的情况下,如果当前运行的线程数仍低于最大线程数,将会新建线程执行任务。
  4. 如果当前线程数已达到最大线程数,新任务将会被拒绝,饱和策略会调用 RejectedExecutionHandler.rejectedExecution() 方法来处理。

从中可以看出,新的任务到达时,会首先检查当前的线程数量是否已经达到核心线程数,是则将新任务存入队列中。

线程池与阻塞队列的选择

不同类型的线程池会使用不同的阻塞队列,我们可以通过内置线程池来分析其特点:

  • LinkedBlockingQueue(无界队列,容量为 Integer.MAX_VALUE):用于 FixedThreadPoolSingleThreadExecutor。由于队列不会满,因此 FixedThreadPool 最多只能创建核心线程数的线程,而 SingleThreadExecutor 只能创建一个线程。
  • SynchronousQueue(同步队列):用于 CachedThreadPool。无容量的 SynchronousQueue 不存储元素,目的是确保提交任务时,如果有空闲线程,则使用之,否则会新建线程处理任务。可以理解为 CachedThreadPool 的最大线程数是 Integer.MAX_VALUE,这意味着线程数可扩展,可能创建大量线程,导致 OOM(内存溢出)。
  • DelayedWorkQueue(延迟阻塞队列):用于 ScheduledThreadPoolSingleThreadScheduledExecutor。该队列中的任务并不是按照放入的时间排序,而是根据延迟时间排序,内部使用堆数据结构,确保每次出队的任务都是当前执行时间最靠前的。DelayedWorkQueue 会在元素数量达到上限后自动扩容,因此不会导致阻塞,最大扩容可达 Integer.MAX_VALUE

要实现一个优先级任务线程池,考虑使用 PriorityBlockingQueue(优先级阻塞队列)作为任务队列,这可以通过 ThreadPoolExecutor 的构造函数中的 workQueue 参数来实现。

图片

ThreadPoolExecutor 构造函数详解

PriorityBlockingQueue 是无界阻塞队列,且支持优先级,底层基于小顶堆实现,允许优先级高的元素优先出队。需要注意的是,PriorityQueue 并不支持阻塞操作。

为了使 PriorityBlockingQueue 能够按优先级对任务进行排序,提交的任务必须具备排序能力,这可以通过以下两种方式实现:

  1. 提交到线程池的任务实现 Comparable 接口,并重写 compareTo 方法,以指定任务间的优先级比较规则。
  2. 创建 PriorityBlockingQueue 时传入一个 Comparator 对象,以指定任务间的排序规则(推荐)。

然而,这种设计存在一些潜在的问题和风险:

  • 由于 PriorityBlockingQueue 是无界的,可能会导致请求堆积,从而引发 OOM。
  • 低优先级任务可能会因为长时间得不到执行而导致饥饿问题。
  • 为了保证线程安全和对元素进行排序操作,可能会影响性能。

针对 OOM 问题,可以通过继承 PriorityBlockingQueue 并重写 offer 方法,在插入元素数量超过阈值时返回 false 处理。

为了解决饥饿问题,可以通过优化设计,例如将等待时间过长的任务移除并重新添加到队列中,同时提升其优先级。

对于性能问题,由于需要排序操作,这是无法避免的;不过在大多数业务场景中,这种性能影响是可以接受的。