MENU

线程池 - ThreadPoolExecutor

August 1, 2020 • Read: 332 • Java

线程池

线程池内部结构及工作原理

拒绝策略4种

  • AbordPolicy 抛出异常,默认的,无法接收任务,交给业务处理用的最多
  • CallerRunsPolicy
  • DiscardOldestPolicy 丢弃队列中最老的任务,再次尝试提交
  • DiscardPolicy 直接丢弃

线程池状态

  • RUUNING
  • SHUTDOWN 调用shutdown,工作队列为空,线程为空 tiding。消费完后退出,提交会拒绝,获取独占锁,获取到代表空闲,
  • STOP 调用shutdownnow 线程为空 转换为 tiding ,不检查独占锁,直接中断,空闲线程不管队列任务。检查是否为最后一个转换状态
  • TIDYING 调用钩子方法 terminated方法
  • TERMINATED

ThreadPoolExecutor

maximumPoolSize 最大工作线程数,获取不到任务时超过keepAliveTime回收

corePoolSize 核心工作线程数

  • 当前线程池线程数量 < corePoolSize 创建worker
  • 当前线程池线程数量 > coolPoolSize 放入队列
  • 队列满了?工作线程数是否 > maximumPoolSize

AtomicInteger ctl

  • 高3位,当前线程池状态
  • 除去高3位的地位表示当前线程池中所拥有的的线程数量
  • RUNNING < SHUTDOWN <STOP <TIDYING< TERMINATED
  • 线程池中全局锁 ReentrantLock mainLock ,增加减少线程池工作线程数量,改变线程池运行状态
  • allowCoreThreadTimeOut true 核心线程空间超过keepAlive也会回收,false,coresize意外的线程超过keepAlive 空闲会被回收

Worker

Worker extends AbstractQueuedSynchronizer implements Runnable

采用独占模式,worker工作中时,会加锁修改state值,外部线程可以通过这个lock状态知道worker是空闲还是忙碌

execute(Runnable command) 提交任务到线程池,创建工作线程

  • ctl.get 判断当前线程数小于coresize 成立,addWorkder

    • 采用核心线程数创建worker,成功返回
    • 失败重新获取最新ctl值,存在并发现象,线程池状态发生改变,一般情况下只有RUNNING状态是addWorker会成功。SHUTDOWN状态下会成功的前提条件是 firstTask == null 而且当前 quene != null
  • 判断线程池工作状态 && 添加任务到队列 workQueue.offer(command)

    • 前置条件,corePoolSize已达到,addWorker失败(多线程工作,coresize满了)

      • 当前线程池处于RUUNING状态,并且将任务放入队列中

        • 再次获取ctl
        • task提交队列后,线程池状态改变,需要删除任务

          • 成立,非RUNNING,任务出队成功
          • 判断当前工作线程数 == 0,担保,保证RUNNING状态下有最少一个线程在工作
    • 前置条件,offer 失败,线程状态非running

      • 尝试走maximumPoolSize
      • 非RUNNING状态addWorker并且command != null ,一定失败

addWorker(Runnable firstTask,boolean core)

  • retry 自旋:判断当前线程池状态是否允许创建工作线程

    • 先获取ctl,根据clt查看当前线程池工作状态
    • 判断当前线程池状态是否允许添加工作线程
    • 内部自旋,获取令牌,cas ctl

      • 获取工作线程数,并判断当前工作线程数是否大于core | maximum
      • cas 工作线程数 +1

        • 成立,申请到一块令牌,跳出循环retry
        • 失败,说明其他线程已经修改过ctl的值了
      • 重新获取ctl值,判断当前线程池状态是否发生改变,返回到retry循环
  • 初始化 两个boolean 变量,

    • workerstarted工作线程是否启动,
    • workeradd工作线程是否添加
  • 创建worker,判断当前woker.thread != null

    • 获取全局所的赋值给mainLock,持有全局锁,会阻塞,线程池内部操作必须获取锁
    • 判断当前线程池状态,添加工作线程 workers.add(w),判断是否成功添加
    • 解锁,判断workerAdded,启动线程
    • 添加worker失败做清理工作,释放令牌,workers.remove
  • 失败的几种情况

    • 线程池状态rs > SHUTDOWN (STOP/TIDYING/TERMINATION)
    • rs == SHUTDOWN 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN且队列未空,但是firstTask不为null
    • 当前线程池已经达到指定指标(coprePoolSize 或者 maximumPoolSIze
    • threadFactory 创建的线程是null

runWorker(Worker w)

  • 调用 w.unlock ;这里为什么先调用unlock? 新建worker时候为-1,就是为了初始化worker state == 0 和 exclusiveOwnerThread ==null
  • 自旋:自旋条件,task != null ,任务队列中获取任务成功(会阻塞)

    • 为什么要设置独占锁呢?shutdown时会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作。
    • 根据线程池状态判断当前线程是否需要interrupt
    • task.run
    • task设置为空更新worker完成任务数量,释放锁
  • completedAbruptly = false; etTask()方法返回null时,说明当前线程应该执行退出逻辑了。
  • getTask()方法返回null时,processWorkerExit 说明当前线程应该执行退出逻辑了。

getTask()

  • 自旋:

    • 获取最新ctl赋值给c
    • 获取当前线程池运行状态赋值给 rs
    • 1.线程池是RUNNING状态
      2.线程池是SHUTDOWN状态 但是队列还未空,此时可以创建线程。
    • 获取线程池中工作线程数量赋值给wc
    • 判断线程是否需要被回收

      • cas ctl -1 成功返回null
      • cas 失败

        • 其他线程先你一步退出了
        • 线程池状态发生变化
    • 判断已哪种方式获取队列中任务,超时方式,还是非超时方式,runnable为空证明超时赋值timedOut

总结:什么情况下会返回null

  • rs >= STOP 成立说明:当前的状态最低也是STOP状态,一定要返回null了
  • 前置条件 状态是 SHUTDOWN ,workQueue.isEmpty()
  • 线程池中的线程数量 超过 最大限制时,会有一部分线程返回Null
  • 线程池中的线程数超过corePoolSize时,会有一部分线程 超时后,返回null。

processWorkerExit(Worker w, boolean completedAbruptly)

  • 是否异常退出 ctl -1
  • 获取全局锁的引用加锁
  • 将当前worker完成的任务数+= 全局completedTaskCount, workers.remove,并释放锁
  • 获取最新ctl,判断当前线程状态是 running,shutdown状态

    • 正常的退出的线程

      • 退出前判断任务队列是否为空,保证线程池中有工作线程
    • 异常退出线程必须创建一个空任务线程

shutdown()

  • 获取线程池全局锁
  • 设置线程池装填为shutdown
  • 中断空闲线程

    • 获取全局锁
    • 遍历workers,获取w.thread

      • 判断thread.isInterrupted && 获取到work的锁,说明当前worker处于空闲状态
      • 设置线程状态为,interrupt,释放锁
    • 释放全局锁

shutdownNow()

  • 强制设置线程池状态
  • 中断线程池中所有线程

    • 获取锁,遍历workers,调用thread.interruptStarted
    • 释放全局锁
  • 导出未处理的task