阻塞队列和线程池


阻塞队列

概念

阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者从容器里拿元素。

其在数据结构中所起作用大致如下图所示:

线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞

为什么需要BlockingQueue

在多线程领域:所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒

使用阻塞队列的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都帮你一手包办了

concurrent 包发布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度

BlockingQueue阻塞队列是属于一个接口,底下有七个实现类

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列
    • 有界,但是界限非常大,相当于无界,可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
    • 生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

这里需要掌握的是:ArrayBlockQueueLinkedBlockingQueueSynchronousQueue

BlockingQueue核心方法

方法类型 抛出异常 特殊值 阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
  • 抛出异常:当阻塞队列满时:在往队列中 add 插入元素会抛出 IllegalStateException: Queue full;而当阻塞队列空时:再往队列中 remove 移除元素,会抛出 NoSuchElementException

  • 特殊值:插入方法,成功true,失败false;移除方法:成功返回出队列元素,队列没有就返回空

  • 阻塞:当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出;当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用

  • 超时退出:当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者程会退出

线程池

顾名思义,线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务

使用线程池的优点:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

线程池的创建

方式一:通过 Executor 框架的工具类 Executors 来创建

  • Executors.newFixedThreadPool(int i) :该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变
    • 当有一个新的任务提交时,线程池中若有空闲线程,则立即执行
    • 若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务
  • Executors.newSingleThreadExecutor():该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务
  • Executors.newCacheThreadPool():该方法返回一个可根据实际情况调整线程数量的线程池
    • 线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程
    • 若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务
    • 所有线程在当前任务执行完毕后,将返回线程池进行复用
  • Executors.newScheduledThreadPool(int corePoolSize):该返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池
// 无界队列 LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) { 

    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

}

// 无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() { 

    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

}

// 同步队列 SynchronousQueue,没有容量,最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() { 

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

}

// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

方式二:通过 ThreadPoolExecutor 构造函数来创建(推荐)

线程池的重要参数

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize, //线程池的核心线程数量
                          int maximumPoolSize, //线程池的最大线程数
                          long keepAliveTime, //当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                          TimeUnit unit, //时间单位
                          BlockingQueue<Runnable> workQueue, //任务队列,用来储存等待执行任务的队列
                          ThreadFactory threadFactory, //线程工厂,用来创建线程,一般默认即可
                          RejectedExecutionHandler handler //拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                         ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor 3 个最重要的参数:

  • **corePoolSize **: 任务队列未达到队列容量时,最大可以同时运行的线程数量

  • **maximumPoolSize **: 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数

  • **workQueue**: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中

  • LinkedBlockingQueue:链表阻塞队列

  • SynchronousBlockingQueue:同步阻塞队列

ThreadPoolExecutor其他常见参数 :

  • **keepAliveTime**:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁

  • unitkeepAliveTime 参数的时间单位

  • threadFactory :表示生成线程池中工作线程的线程工厂,用于创建线程池 一般用默认即可

  • handler :拒绝策略,表示当队列满了并且工作线程大于线程池的最大线程数时,如何来拒绝请求执行的Runnable的策略

拒绝策略

以下所有拒绝策略都实现了 RejectedExecutionHandler 接口

  • AbortPolicy:默认,直接抛出 RejectedExcutionException 异常,阻止系统正常运行
  • CallerRunsPolicy:该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者
  • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果允许任务丢失,这是一种好方案
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

线程池处理流程

  • 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务

  • 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行

  • 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务

  • 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用RejectedExecutionHandler.rejectedExecution()方法

为什么不用默认创建的线程池

根据阿里巴巴手册:并发控制这章

  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
    • 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题,如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题
  • 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
    • Executors 返回的线程池对象弊端如下:
      • FixedThreadPoolSingleThreadPool
        • 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
      • CacheThreadPoolScheduledThreadPool
        • 运行的请求队列长度为:Integer.MAX_VALUE,线程数上限太大导致OOM

手写一个线程池

如何给线程池命名

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。

默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

给线程池里的线程命名通常有下面两种方式:

①利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();

②自己实现 ThreadFactor

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class MyThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public MyThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

如何设定线程池的大小

这个是根据具体业务来配置的,分为CPU密集型和IO密集型

CPU密集型

CPU 密集的意思是该任务需要大量的运算,而没有阻塞,CPU 一直全速运行。CPU 密集任务只有在真正的多核 CPU 上才可能得到加速(通过多线程),而在单核CPU 上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些

CPU 密集型任务配置尽可能少的线程数量:

一般公式:CPU 核数 + 1个线程数

IO密集型

IO 密集型,即该任务需要大量的 IO 操作,即大量的阻塞。在单线程上运行 IO 密集型的任务会导致浪费大量的 CPU 运算能力花费在等待上,所以 IO 密集型任务中使用多线程可以大大的加速程序的运行,即使在单核 CPU 上,这种加速主要就是利用被浪费掉的阻塞时间

IO 密集时,大部分线程都被阻塞,故需要多配置线程数:

参考公式一:CPU 核数 * 2

参考公式二:CPU 核数 / (1 - 阻塞系数), 阻塞系数在0.8 ~ 0.9左右

手写线程池

下面我们创建了一个 核心线程数为2,最大线程数为5,并且阻塞队列数为3的线程池

final Integer corePoolSize = 2;
final Integer maximumPoolSize = 5;
final Long keepAliveTime = 1L;

// 自定义线程池,只改变了LinkBlockingQueue的队列大小
ExecutorService executorService = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    keepAliveTime,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(3),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy());

参考

  • 尚硅谷 Java 大厂面试题第 2 季
  • JavaGuide(Java面试+学习指南)

文章作者: 不才叶某
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 不才叶某 !
评论
  目录