線程的運行機制
1. 開啟線程過多,會消耗cpu2. 單核cpu,同一時刻只能處理一個線程,多核cpu同一時刻可以處理多個線程3. 操作系統為每個運行線程安排一定的CPU時間----`時間片`,系統通過一種循環的方式為線程提供時間片,線程在自己的時間內運行,因為時間相當短,多個線程頻繁地發生切換,因此給用戶的感覺就是好像多個線程同時運行一樣,但是如果計算機有多個CPU,線程就能真正意義上的同時運行了.
線程池的作用
1. 線程池是預先創建線程的一種技術。線程池在還沒有任務到來之前,創建一定數量的線程,放入空閒隊列中,然後對這些資源進行復用。`2. 減少頻繁的創建和銷毀對象。`3.頻繁創建和銷毀線程耗資源,耗時間4.因為有的線程執行時間比創建和銷毀一個線程的時間還長`
線程池涉及的類
* Executor:Java裡面線程池的頂級接口* ExecutorService:真正的線程池接口* ScheduledExecutorService:能和Timer/TimerTask類似,解決那些需要任務重復執行的問題* ThreadPoolExecutor(重點):ExecutorService的默認實現。* ScheduledThreadPoolExecutor:繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,周期性任務調度的類實現。
Executors
> Executors:jdk1.5之後的一個新類,提供了一些靜態工廠,生成一些常用的線程池,ThreadPoolExecutor是Executors類的底層實現
創建線程池有4種方法:
1.newSingleThreadExecutor
創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行>所有任務。如果這個唯一的線程因為異常結束,那麼會有一個新的線程來替代它。此線程池>保證所有任務的執行順序按照任務的提交順序執行。
2.newFixedThreadPool
創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那麼線程池會補充一個新線程。
3.newCachedThreadPool
創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,
那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。
4.newScheduledThreadPool
創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。
ThreadPoolExecutor介紹:
//構造方法
public ThreadPoolExecutor(int corePoolSize,//核心池的大小
int maximumPoolSize,//線程池最大線程數
long keepAliveTime,//保持時間
TimeUnit unit,//時間單位
BlockingQueue
workQueue,//任務隊列
ThreadFactory threadFactory,//線程工廠
RejectedExecutionHandler handler) //異常的捕捉器
構造相關參數解釋
* corePoolSize:`核心池的大小`,這個參數跟後面講述的線程池的實現原理有非常大的關系。在創建了線程池後,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
* maximumPoolSize:`線程池最大線程數`,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
* keepAliveTime:`表示線程沒有任務執行時最多保持多久時間會終止`。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
* unit:參數keepAliveTime的`時間單位`,有7種取值
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
* workQueue : `任務隊列`,是一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響
如果BlockingQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒,同樣,如果BlockingQueue是滿的,任何試圖往裡存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有空間時才會被喚醒繼續操作。
1. 基礎API介紹
* __往隊列中加元素的方法__
* add(E) : 非阻塞方法, 把元素加到BlockingQueue裡,如果BlockingQueue可以容納,則返回true,否則拋出異常。
* offer(E) : 非阻塞, 表示如果可能的話,將元素加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false。
* __put(E)__:阻塞方法, 把元素加到BlockingQueue裡,如果BlockingQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裡有空間再繼續。
* __從隊列中取元素的方法__
* __poll(time)__: 阻塞方法,取走BlockingQueue裡排在首位的元素,若不能立即取出,則可以等time參數規定的時間,取不到時返回null。
* __take()__:取走BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入為止。
2. 子類介紹
* `ArrayBlockingQueue(有界隊列)`: FIFO 隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小
* `LinkedBlockingQueue(無界隊列)`:FIFO 隊列,大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。
* `PriorityBlockingQueue`:優先級隊列, 類似於LinkedBlockingQueue,但隊列中元素非 FIFO, 依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序
* `SynchronousQueue(直接提交策略)`: 交替隊列,隊列中操作時必須是先放進去,接著取出來,交替著去處理元素的添加和移除
* threadFactory : `線程工廠`,如何去創建線程的
* handler : 任務隊列添加`異常的捕捉器`,參考 RejectedExecutionHandler
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
基礎API的介紹
* isShutdown() : 判斷線程池是否關閉
* isTerminated() : 判斷線程池中任務是否執行完成
* shutdown() : 調用後不再接收新任務,如果裡面有任務,就執行完
* shutdownNow() : 調用後不再接受新任務,如果有等待任務,移出隊列;有正在執行的,嘗試停止之
* submit() : 提交執行任務
* execute() : 執行任務
任務提交給線程池之後的處理策略
1. 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建執行這個任務;
2. 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中
1. 若添加成功,則該任務會等待空閒線程將其取出去執行;![](img/task2.png)
2. 若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
3. 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
4. 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。
任務提交給線程池之後的處理策略_比喻
>假如有一個工廠,工廠裡面有10(`corePoolSize`)個工人,每個工人同時只能做一件任務。
>因此只要當10個工人中有工人是空閒的,`來了任務就分配`給空閒的工人做;
>當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待(`任務隊列`);
>如果說新任務數目增長的速度遠遠大於工人做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時工人(`創建新線程`)進來;然後就將任務也分配給這4個臨時工人做;
>如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了(`拒絕執行`)。
>當這14個工人當中有人空閒時,而且空閒超過一定時間(`空閒時間`),新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的
----------------------------------------------------------------------------------------------------------------------------------------------------
最近發現幾起對ThreadPoolExecutor的誤用,發現都是因為沒有仔細看注釋和內部運轉機制,想當然的揣測參數導致,先看一下新建一個ThreadPoolExecutor的構建參數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
看這個參數很容易讓人以為是線程池裡保持corePoolSize個線程,如果不夠用,就加線程入池直至maximumPoolSize大小,如果還不夠就往workQueue裡加,如果workQueue也不夠就用RejectedExecutionHandler來做拒絕處理。
但實際情況不是這樣,具體流程如下:
1)當池子大小小於corePoolSize就新建線程,並處理請求
2)當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒線程就去從workQueue中取任務並處理
3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀
內部結構如下所示:
從中可以發現ThreadPoolExecutor就是依靠BlockingQueue的阻塞機制來維持線程池,當池子裡的線程無事可干的時候就通過workQueue.take()阻塞住。
其實可以通過Executes來學學幾種特殊的ThreadPoolExecutor是如何構建的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
newFixedThreadPool就是一個固定大小的ThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
newCachedThreadPool比較適合沒有固定大小並且比較快速就能完成的小任務,沒必要維持一個Pool,這比直接new Thread來處理的好處是能在60秒內重用已創建的線程。
其他類型的ThreadPool看看構建參數再結合上面所說的特性就大致知道它的特性
線程池工具類:
ThreadPoolFactory 線程池工廠類
ThreadPoolProxy 線程池代理類
ThreadPoolUtils 線程池工具類
ThreadPoolFactory :
public class ThreadPoolFactory {
public static ThreadPoolProxy normalThreadPool;
public static final int NORMAL_COREPOOLSIZE = 5;
public static final int NORMAL_MAXIMUMPOOLSIZE = 5;
public static final long NORMAL_KEEPALIVETIME = 60;
public static ThreadPoolProxy getNormalThreadPool() {
//雙重檢測機制
if (normalThreadPool == null) {
synchronized (ThreadPoolFactory.class) {
if (normalThreadPool == null) {
normalThreadPool = new ThreadPoolProxy(NORMAL_COREPOOLSIZE,NORMAL_MAXIMUMPOOLSIZE,NORMAL_KEEPALIVETIME);}
}
}
return normalThreadPool;
}
}
ThreadPoolProxy:
public class ThreadPoolProxy {
ThreadPoolExecutor executor;
int corePoolSize;
int maximumPoolSize;
long keepAliveTime;
public ThreadPoolProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
}
public void initThreadPoolProxy() {
//雙重檢測機制
if (executor == null) {
synchronized (ThreadPoolExecutor.class) {
if (executor == null) {
BlockingQueue workQueue = new LinkedBlockingDeque<>();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, threadFactory, handler);
}
}
}
}
public void exector(Runnable runnable) {
initThreadPoolProxy();
executor.execute(runnable);
}
public void remove(Runnable runnable) {
initThreadPoolProxy();
executor.remove(runnable);
}
}
ThreadPoolUtils:
public class ThreadPoolUtils {
//在UI中執行
static Handler handler = new Handler();
public static void runTaskOnMainThread(Runnable runnable) {
handler.post(runnable);
}
//非UI執行
public static void runTaskOnThread(Runnable runnable) {
ThreadPoolFactory.getNormalThreadPool().exector(runnable);
}
}