写在前面
多线程的软件设计,主要是为了最大程度上利用CPU的使用率最大化生产环境的吞吐量和性能。
如果对线程管理不当,非常容易造成系统崩溃,线程相对进程而言虽然轻量,无止境的使用更会造成内存泄漏,所以线程使用的 度 需要很好的把控好。
做过数据库链接工作的朋友们,对数据库连接池肯定不陌生,用连接池来维护一些激活的数据库链接,需要的时候从连接池取,不需要的时候交换给连接池而不是真正的销毁。
RoadMap
对于线程池的学习,主要分文2个篇幅,线程池的使用和线程池的实现。
使用篇:
开箱即用,JDK对线程池的支持。
创建线程池
线程池的使用
线程池的生命周期
Futre模式与Callable接口
实现篇:
扒一扒ThreadPoolExecutor
找个地方放任务请求:任务队列
超负荷了怎么办: reject handler
开箱即用,JDK对线程池的支持
在JDK5.0 引入 Current包之后 提供了对线程池的支持(Executor 框架). 看一下它家的族谱:
Executor 和ExecutorService 是接口,AbstractExecutorService 是抽象类,实现了公共方法, 它的子类分别指向了不同类型的Executor, 今天我们要讲的是这个 ThreadPoolExecutor,线程池。
Executors 是一个工厂类(别和Executor 接口搞混了哈, 虽然只差了 一个s),它是用来创建各式各样的Executor,其中包括了线程池, 定时调度的任务池等等。
创建线程池
用Executors 创建线程池 非常简单,调用对应的方法即可,传递的参数 是 线程数量 或者 线程创建工厂,需要自行实现创建线程的方法。
newCachedThreadPool() newCachedThreadPool(ThreadFactory factory) | 缓存线程 的线程池 | 当任务提交过来,如果没有空闲的线程,则创建新线程,来执行任务。注意: 如果提交任务的速度大于 完成任务的速度,那么会不断的有新线程创建,直到内存耗尽。 所以在使用这类线程池的时候要加倍注意。 |
newSingleThreadPool(ThreadFactory factory) | 单线程的线程池 | |
newFixedThreadPool(int nThreads)newFixedThreadPool(int nThreads, ThreadFactory factory) | 固定线程数量的线程池 | 当n =1 的时候 与SingleThreadPool 作用类似 |
线程池的使用
void shutdown() | 不再接受新的任务, 待现有的任务完成后关闭线程 |
List<Runnable> shutDownNow() | 尝试停止 正在执行的线程任务,并关闭线程池 返回的是提交给线程池,还没执行的线程组。 |
boolean isShutdown() | 判断线程池是否关闭 |
boolean isTerminated() | 判断线程池是否终止Shutdown 和 terminated 的区别下文【线程池的生命周期】提到,这与ExecutorService 的生命周期有关。 |
boolean awaitTermination(long timeout, TimeUnit unit) | 接收人timeout和TimeUnit两个参数,阻塞当前线程一段时间来检测线程池的终止状态。 如果终止返回 true。 以下三种情况会结束这种阻塞。 1. 超过设定的等待时间。 2. 等待的这段时间,线程池中的任务全部完成进入 终止状态。 3. 或者当前线程遇到interrupttion |
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); void execute(Runnable command); | 提交任务到当前Executor 的任务队列,等待调度完成。这几个方法的区别在下文【Future模式与Callable】会提到关与名词Future和新接口Callable |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 提交这个集合里面所有的任务,等集合任务所有任务完成 才算完成,这个timeout 是只整个集合的超时时间,而不是单个 |
<T> T invokeAny(Collection<? extends Callable<T>> tasks) <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) | 提交这个集合里面所有的任务,等集合任务任意一个任务完成 才算完成,这个timeout 是只整个的超时时间 |
线程池的生命周期
线程池,简单可以分为三个阶段, 运行阶段, 关闭阶段,终止阶段。
运行阶段没什么好说的, 主要是区分一下关闭阶段和终止阶段。
打个比方可能会好理解一点,把线程池比做超市,游客比做线程,超市一般是晚上10点停止营业,10点以后就是关闭状态,试下一下,如果10以后你还在超市里面逛,超市并不会把杀死对吗, 只是催着你赶紧去结账对吗, 另外超市晚上10点以后就不能进游客了。 当游客都走了,帐结算完 超市晚上才正式歇业(终止).
类比回线程池,那么线程池关闭,不能提交任务,会将正在处理(不包含队列中)的任务执行完。 线程全部完成之后 进入终止状态。
Future模式与Callable
Callable 是在java.util.concurrent 包中新接口,如果要实现线程, 实现Runnable 和Callable 接口都可以,
区别在于, runnable 没有返回值,不抛出异常,one way的方式,所以使用runnable的时候,异常基本上都是线程内部处理,不能够交给主线程来处理,
甚至有实现,这个线程挂了或有没有执行完都不知道。
Callable 摆脱了前面说的这种情况,它加了范性的返回值,同时允许抛出异常,这样对多线程的调度和处理更加灵活一些。
Future是JDK内置的并发设计模式中的Future模式,有兴趣的同学可以作为扩展阅读深入了解一下,这个经典模式。
这里简单的介绍一下:
一般情况,如果要拿到方法的返回值 是不是要等方法运行完,如果这个方法要运行很久,那岂不是要等很久。
那开子线程不是不用等了吗?非也,线程在计算完之前,可能这个结果是个null,对你后面的逻辑也是有影响的。
所以就引入了FutureData的概念,子线程计算的结果 对我来说是一个FutureData, 我只是需要知道 它计算完毕之后我去取一下数据就可以了。
或者你可以这么认为,这个Future对象是一个中介,它持有对线程计算结果的引用同时也有线程计算完成的状态,你之后问一下中间人,计算完了吗?
完了 那么把结果给我。
扒一扒ThreadPoolExecutor
用Executors创建线程池的时候 通过IDE点进那个方法进去 你会发现,哇塞 都是调用了同一个类的构造方法。That is ThreadPoolExecutor.
ThreadPoolExecutor 的构造方法是被重载的,总体概括起来,需要设定这么几个参数
corePoolSize : 指定线程池 常驻线程的数量,
maximumPoolSize: 指定线程池 最大的线程数量
keepAliveTime: 允许线程最大空闲时间, 如果当线程数大于corePoolSize的时候 会释放空闲的线程来节约内存。
TimeUnit: 空闲时间的单位
BlockingQueue<Runnable> 任务队列, 提交到线程池的任务就是放在这的。
ThreadFactory 线程创建工厂用来创建线程
RefectedExecutionHandler : 主要用于 当需要处理 任务队列满了之后 拒绝提交情况下的处理。
逻辑图:
找个地方放任务请求:任务队列
这里需要具体说说 这个任务队列,构造函数里面的声明的是,BlockingQueue, 这是个接口 根据功能不同可以运用好几种不同的队列。
- 直接提交队列:该功能由SynchronousQueue 实现, 是一个特殊的BlockingQueue,因为SynchronousQueue 没有容量,没插入一个操作都需要等待对应的删除操作,反之每一个删除操作都需要等待对应的插入,所以提交的任务是不被保存的,每次都提交给线程,如果没有线程则创建新的线程,如果到达线程上线则需要执行拒绝策略,通常来说要使用这种Queue 则需要设定很大的MaximumPoolSize,否则可能会拒绝掉很多请求。
- 有界任务队列: 该功能由ArrayBlockingQueue实现, 其内部是数组实现所以 初始化的时候需要定义容量,当线程数小于corePoolSize时,会优先创建线程处理队列的中的任务,如果线程数大于corePoolSize,则存放在队列,存满的时候,如果线程数没有超过maximunPoolSize,则创建线程,否则就需要执行决绝策略。
- 无界任务队列: 该功能由LinkedBlockingQueue实现, 因为是链表结构 所以可以实现无界的任务队列,当线程数小于corePoolSize时,会优先创建线程处理队列的中的任务,如果线程数等于corePoolSize,不会再创建新的线程,后面的任务会一直添加到任务队列中,因为没有界,没有必要准备拒绝策略来执行,要注意的是,如果添加任务的速度大于处理任务的速度,会出现无限扩容 直到内存耗尽的情况。
- 优先任务队列:该功能由PriorityBlockingQueue实现,它是一个特殊的无界队列,常规的队列都是先进先出,PriorityBlockingQueue可以处理优先级高的任务。
超负荷了怎么办: 决绝策略。
前文提到过很多次决绝策略, 这到底是什么鬼呢。 其实也很好理解 就是任务队列满了之后 需要处理下后续请求,说是说拒绝策略,也未必真的就把人家个拒绝了。
JDK内置了4种策略
- AbortPolicy: 中断策略, 该策略是抛出异常,中断系统运行
- CallerRunsPolicy: 不通过线程池的线程来执行任务,容易造成系统资源紧张。有点想
- DiscardOldestPolicy: 丢弃老请求策略: 如果执行程序尚未关闭,该策略是会把最老的请求放弃 ,即将要处理的请求,就是把队列头的任务放弃掉,新的任务入队列。
- DiscardPolicy:丢弃请求策略: 该策略是会放弃无法处理的请求,其实就是默默丢弃任务的做法。和AbortPolicy类似只是不抛异常
回看Executors建立的几个线程池:
Executors在建线程池的时候,就是在这个几个参数的上做选择,以实现不同类型的线程池
WorkQueue | corePoolSize | MaximumPoolSize | aliveTime | Reject Handler | |
singleThreadPool | LinkedBlockingQueue | 1 | 1 | 0 second | AbortPolicy |
FixedThreadPool | LinkedBlockingQueue | 定义的线程数 | 定义的线程数 | 0 second | AbortPolicy |
cachedThreadPool | SynchronousQueue | 0 | Integer.MAX_VALUE | 60 second | AbortPolicy |
JDK 默认的Policy 是AbortPolicy ,尽管没有传递这个参数,JDK 默认采用终止策略,
有意思的是,cachedThreadPool 用的是SynchronousQueue,即来了请求就开线程,线程空闲60秒就销毁。
欢迎关注微信公众号