线程池

种类

newFixedThreadPool

  • 创建线程数量固定的线程池,线程池的corePoolSizemaximumPoolSize大小一样,并且keepAliveTime为0,传入的队列LinkedBlockingQueue为无界队列(maximumPoolSize参数不起作用)。
  • 执行长期的任务

newSingleThreadExecutor

  • corePoolSizemaximumPoolSize都是1,keepAliveTime是0L, 传入的队列是无界队列。线程池中永远只有一个线程在工作。

newCachedThreadPool

  • corePoolSize为0,maximumPoolSize为int最大值,线程池传入的队列是SynchronousQueue,同步队列,该队列没有任何容量,每次插入新数据,必须等待消费完成。当有新任务到达时,线程池没有线程则创建线程处理,处理完成后该线程缓存60秒,过期后回收,线程过期前有新任务到达时,则使用缓存的线程来处理。

newScheduledThreadPool

  • 创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行。

参数

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。

拒绝策略

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

execute( )

  1. 判断核心线程数量
  2. 判断工作队列是否满
  3. adWorker( ),失败则拒绝策略

addWorker

WorkerThreadPoolExecutor中的内部类,继承自AQS且实现了Runnable接口。是一个可执行的Runnable对象。在Worker的构造函数中,使用线程工厂创建了一个线程,当thread启动的时候,会以worker.run()为入口启动线程,这里会直接调用到runWorker()中。

submit( )

需要自己实现Callable接口的call()方法,submit() 提交任务到线程池,获取任务返回结果,返回结果是通过FutureTask来实现的。提交任务还是执行execute()方法,只是task被包装成了FutureTask ,也就是在excute()中启动线程后会执行FutureTask.run()方法。

流程

当调用submit()方法提交任务到线程池后,会先调用newTaskFor()方法将任务封装成一个FutureTask对象,然后调用execute()方法来执行任务。在execute()方法中会先启动Worker线程,当线程启动后,会调用线程的runWorker()方法。在runWorker()方法中最终会调用到task.run()方法,也就是FutureTask的run()方法。

在run()方法中,最终会调用callable属性的call()方法。当任务正常执行完后,会调用FutureTask的set()方法来更新任务的状态以及保存任务的返回值,最后唤醒获取任务结果的处于等待中的线程。如果出现异常,将会调用setException()方法来更新任务状态,保存异常,唤醒等待中的线程。

线程复用

while (task != null || (task = getTask()) != null) {
	task.run();//执行task中的run方法
}

getTask 函数不断从 workQueue 中得到任务,通过 task.run( ) 来执行。没有任务时阻塞。

线程在执行任务的时候,本质上是调用了任务自身的run/call方法。

有界队列

  1. 当提交的任务数超过了corePoolSize,会将当前的任务提交到一个block queue中
  2. 有界队列满了之后,如果 poolSize < maximumPoolsize,会尝试新建一个Thread进行处理
  3. 若失败则拒绝

无界队列

  1. 执行 execute 方法时,如果发现核心线程数已满,先执行 workQueue.offer(command) 来入列。
  2. 队列也满了后,才会去创建新的非核心线程 。(LinkedBlockingQueue如果用无参构造函数初始化,默认的容量是Integer.MAX_VALUE)

所以使用无界队列,会直接导致最大线程数失效