編輯:關於android開發
——基於內核2.6.21RCU實現(lvyilong316)
RCU(Read-Copy Update),顧名思義就是讀-拷貝修改,它是基於其原理命名的。對於被RCU保護的共享數據結構,讀者不需要獲得任何鎖就可以訪問它,但寫者在訪問它時首先拷貝一個副本,然後對副本進行修改,最後使用一個回調(callback)機制在適當的時機把指向原來數據的指針重新指向新的被修改的數據。那麼這個“適當的時機”是怎麼確定的呢?這是由內核確定的,也是我們後面討論的重點。
RCU實際上是一種改進的rwlock,讀者幾乎沒有什麼同步開銷,它不需要鎖,不使用原子指令,而且在除alpha的所有架構上也不需要內存柵(Memory Barrier),因此不會導致鎖競爭,內存延遲以及流水線停滯。不需要鎖也使得使用更容易,因為死鎖問題就不需要考慮了。寫者的同步開銷比較大,它需要延遲數據結構的釋放,復制被修改的數據結構,它也必須使用某種鎖機制同步並行的其它寫者的修改操作。讀者必須提供一個信號給寫者以便寫者能夠確定數據可以被安全地釋放或修改的時機。有一個專門的垃圾收集器來探測讀者的信號,一旦所有的讀者都已經發送信號告知它們都不在使用被RCU保護的數據結構,垃圾收集器就調用回調函數完成最後的數據釋放或修改操作。 RCU與rwlock的不同之處是:它既允許多個讀者同時訪問被保護的數據,又允許多個讀者和多個寫者同時訪問被保護的數據(注意:是否可以有多個寫者並行訪問取決於寫者之間使用的同步機制),讀者沒有任何同步開銷,而寫者的同步開銷則取決於使用的寫者間同步機制。但RCU不能替代rwlock,因為如果寫比較多時,對讀者的性能提高不能彌補寫者導致的損失。
讀者在訪問被RCU保護的共享數據期間不能被阻塞,這是RCU機制得以實現的一個基本前提,也就說當讀者在引用被RCU保護的共享數據期間,讀者所在的CPU不能發生上下文切換,spinlock和rwlock都需要這樣的前提。寫者在訪問被RCU保護的共享數據時不需要和讀者競爭任何鎖,只有在有多於一個寫者的情況下需要獲得某種鎖以與其他寫者同步。寫者修改數據前首先拷貝一個被修改元素的副本,然後在副本上進行修改,修改完畢後它向垃圾回收器注冊一個回調函數以便在適當的時機執行真正的修改操作。等待適當時機的這一時期稱為grace period,而CPU發生了上下文切換稱為經歷一個quiescent state,grace period就是所有CPU都經歷一次quiescent state所需要的等待的時間。垃圾收集器就是在grace period之後調用寫者注冊的回調函數來完成真正的數據修改或數據釋放操作的。
要想使用好RCU,就要知道RCU的實現原理。我們拿linux 2.6.21 kernel的實現開始分析,為什麼選擇這個版本的實現呢?因為這個版本的實現相對較為單純,也比較簡單。當然之後內核做了不少改進,如搶占RCU、可睡眠RCU、分層RCU。但是基本思想都是類似的。所以先從簡單入手。
首先,上一節我們提到,寫者在訪問它時首先拷貝一個副本,然後對副本進行修改,最後使用一個回調(callback)機制在適當的時機把指向原來數據的指針重新指向新的被修改的數據。而這個“適當的時機”就是所有CPU經歷了一次進程切換(也就是一個grace period)。為什麼這麼設計?因為RCU讀者的實現就是關搶占執行讀取,讀完了當然就可以進程切換了,也就等於是寫者可以操作臨界區了。那麼就自然可以想到,內核會設計兩個元素,來分別表示寫者被掛起的起始點,以及每cpu變量,來表示該cpu是否經過了一次進程切換(quies state)。就是說,當寫者被掛起後,
1)重置每cpu變量,值為0。
2)當某個cpu經歷一次進程切換後,就將自己的變量設為1。
3)當所有的cpu變量都為1後,就可以喚醒寫者了。
下面我們來分別看linux裡是如何完成這三步的。
我們從一個例子入手,這個例子來源於linux kernel文檔中的whatisRCU.txt。這個例子使用RCU的核心API來保護一個指向動態分配內存的全局指針。
struct foo {
int a;
char b;
long c;
};
DEFINE_SPINLOCK(foo_mutex);
struct foo *gbl_foo;
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
synchronize_rcu();
kfree(old_fp);
}
int foo_get_a(void)
{
int retval;
rcu_read_lock();
retval = rcu_dereference(gbl_foo)->a;
rcu_read_unlock();
return retval;
}
如上代碼所示,RCU被用來保護全局指針struct foo *gbl_foo。foo_get_a()用來從RCU保護的結構中取得gbl_foo的值。而foo_update_a()用來更新被RCU保護的gbl_foo的值(更新其a成員)。
首先,我們思考一下,為什麼要在foo_update_a()中使用自旋鎖foo_mutex呢?假設中間沒有使用自旋鎖.那foo_update_a()的代碼如下:
void foo_update_a(int new_a)
{
struct foo *new_fp;
struct foo *old_fp;
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
old_fp = gbl_foo;
1:-------------------------
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
synchronize_rcu();
kfree(old_fp);
}
假設A進程在上圖----標識處被B進程搶點.B進程也執行了goo_ipdate_a().等B執行完後,再切換回A進程.此時,A進程所持的old_fd實際上已經被B進程給釋放掉了.此後A進程對old_fd的操作都是非法的。所以在此我們得到一個重要結論:RCU允許多個讀者同時訪問被保護的數據,也允許多個讀者在有寫者時訪問被保護的數據(但是注意:是否可以有多個寫者並行訪問取決於寫者之間使用的同步機制)。
說明:本文中說的進程不是用戶態的進程,而是內核的調用路徑,也可能是內核線程或軟中斷等。
另外,我們在上面也看到了幾個有關RCU的核心API。它們為別是:
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()
其中,rcu_read_lock()和rcu_read_unlock()用來保持一個讀者的RCU臨界區.在該臨界區內不允許發生上下文切換。為什麼不能發生切換呢?因為內核要根據“是否發生過切換”來判斷讀者是否已結束讀操作,我們後面再分析。
rcu_dereference():讀者調用它來獲得一個被RCU保護的指針。
rcu_assign_pointer():寫者使用該函數來為被RCU保護的指針分配一個新的值。
synchronize_rcu():這是RCU的核心所在,它掛起寫者,等待讀者都退出後釋放老的數據。
lrcu_read_lock()和rcu_read_unlock()
rcu_read_lock()和rcu_read_unlock()的實現如下:
#define rcu_read_lock() __rcu_read_lock()
#define rcu_read_unlock() __rcu_read_unlock()
#define __rcu_read_lock()
do {
preempt_disable();
__acquire(RCU);
rcu_read_acquire();
} while (0)
#define __rcu_read_unlock()
do {
rcu_read_release();
__release(RCU);
preempt_enable();
} while (0)
其中__acquire(),rcu_read_acquire(),rcu_read_release(),__release()都是一些選擇編譯函數,可以忽略不可看。因此可以得知:rcu_read_lock(),rcu_read_unlock()只是禁止和啟用搶占.因為在讀者臨界區,不允許發生上下文切換。
lrcu_dereference()和rcu_assign_pointer()
rcu_dereference()和rcu_assign_pointer()的實現如下:
#define rcu_dereference(p) ({
typeof(p) _________p1 = ACCESS_ONCE(p);
smp_read_barrier_depends();
(_________p1);
})
#define rcu_assign_pointer(p, v)
({
if (!__builtin_constant_p(v) ||((v) != NULL))
smp_wmb();
(p) = (v);
})
它們的實現也很簡單.因為它們本身都是原子操作。只是為了cache一致性,插上了內存屏障。可以讓其它的讀者/寫者可以看到保護指針的最新值.
lsynchronize_rcu()
synchronize_rcu()在RCU中是一個最核心的函數,它用來等待之前的讀者全部退出.我們後面的大部份分析也是圍繞著它而進行.實現如下:
void synchronize_rcu(void)
{
struct rcu_synchronize rcu;
init_completion(&rcu.completion);
/* Will wake me after RCU finished */
call_rcu(&rcu.head, wakeme_after_rcu);
/* Wait for it */
wait_for_completion(&rcu.completion);
}
我們可以看到,它初始化了一個本地變量,它的類型為struct rcu_synchronize.調用call_rcu()之後.一直等待條件變量rcu.competion的滿足。
在這裡看到了RCU的另一個核心API,它就是call_run()。它的定義如下:
void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_ctrlblk);
}
local_irq_restore(flags);
}
該函數也很簡單,就是將參數傳入的回調函數fun賦值給一個struct rcu_head變量,再將這個struct rcu_head加在了per_cpu變量rcu_data的nxttail 鏈表上。
rcu_data定義如下,是個每cpu變量:
DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
接著我們看下call_rcu注冊的函數,我們也可以看到,在synchronize_rcu()中,傳入call_rcu的函數為wakeme_after_rcu(),其實現如下:
static void wakeme_after_rcu(struct rcu_head *head)
{
struct rcu_synchronize *rcu;
rcu = container_of(head, struct rcu_synchronize, head);
complete(&rcu->completion);
}
我們可以看到,該函數將條件變量置真,然後喚醒了在條件變量上等待的進程。
由此,我們可以得知,每一個CPU都有一個rcu_data.每個調用call_rcu()/synchronize_rcu()進程的進程都會將一個rcu_head都會掛到rcu_data的nxttail鏈表上(這個rcu_head其實就相當於這個進程在RCU機制中的體現),然後掛起。當讀者都完成讀操作後(經過一個grace period後)就會觸發這個rcu_head上的回調函數來喚醒寫者。整個過程如下圖所示:
看到這裡,也就到了問題的關鍵,內核是如何判斷當前讀者都已經完成讀操作了呢(經過了一個grace period)?又是由誰來觸發這個回調函數wakeme_after_rcu呢?下一小節再來分析。
那究竟怎麼去判斷當前的讀者已經操作完了呢?我們在之前看到,不是讀者在調用rcu_read_lock()的時候要禁止搶占麼?因此,我們只需要判斷所有的CPU都進過了一次上下文切換,就說明所有讀者已經退出了。要徹底弄清楚這個問題,我們得從RCU的初始化說起。
RCU的初始化開始於start_kernel()-->rcu_init()。而其主要是對每個cpu調用了rcu_online_cpu函數。
lrcu_online_cpu
static void __cpuinit rcu_online_cpu(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}
這個函數主要完成兩個操作:初始化兩個per cpu變量;注冊RCU_SOFTIRQ軟中斷處理函數rcu_process_callbacks。我們從這裡又看到了另一個percpu變量:rcu_bh_data.有關bh的部份之後再來分析.在這裡略過這些部分。下面看下rcu_init_percpu_data()的實現。
lrcu_init_percpu_data
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,struct rcu_data *rdp)
{
memset(rdp, 0, sizeof(*rdp));
rdp->curtail = &rdp->curlist;
rdp->nxttail = &rdp->nxtlist;
rdp->donetail = &rdp->donelist;
rdp->quiescbatch = rcp->completed;
rdp->qs_pending = 0;
rdp->cpu = cpu;
rdp->blimit = blimit;
}
調用這個函數的第二個參數是一個全局變量rcu_ctlblk。在繼續向下分析之前我們先看下這些函數用到的一些重要數據結構。
說明:下列數據結構只列出了主要成員。
structrcu_ctrlblk{
longcur;
longcompleted;
cpumask_tcpumask;
};
structrcu_ctrlblk的主要作用就是定義上節提到的全局變量rcu_ctlblk,這個變量在系統中是唯一的。另外說明一下,為了記錄方便,內核將從啟動開始的每個grace period對應一個數字表示。
這裡的cpumask是為了標識當前系統中的所有cpu,以便標記哪些cpu發生過上下文切換(經歷過一個quiescent state)。而cur,completed,則用來同步。我們可以這樣理解,cur和completed是系統級別的記錄信息,也即系統實時經歷的grace編號,一般情況下,新開一個graceperiod等待周期的話,cur會加1,當graceperiod結束後,會將completed置為cur,所以通常情況下,都是completed追著cur跑。那麼我們可能會猜測,是不是如果complete= curr -1 的時候,就表示系統中graceperiod還沒有結束?當completed= curr的時候,就表示系統中不存在graceperiod等待周期了?我們姑且這麼理解,實際上有些許差別,但設計思想都是一樣的。
structrcu_data{
longquiescbatch;
intpassed_quiesc;
longbatch;
structrcu_head*nxtlist;
structrcu_head**nxttail;
structrcu_head*curlist;
structrcu_head**curtail;
structrcu_head*donelist;
structrcu_head**donetail;
};
上面的結構,要達到的作用是,跟蹤單個CPU上的RCU事務。
(1)passed_quiesc:這是一個flag標志,表示當前cpu是否已經發生過搶占了(經歷過一個quiescent state),0表示為未發送過切換,1表示發生過切換;
說明:我們一直強調發生過一次cpu搶占就是經歷過一個quiescent state,其實這是不嚴格的說法。為什麼呢?因為自系統啟動,各種進程頻繁調度,肯定每個cpu都會發生過搶占。所以我們這裡說的“發生過搶占”是指從某個特殊的時間點開始發生過搶占。那麼這個特殊的時間點是什麼時候呢?就是我們調用synchronize_rcu將rcu_head掛在每cpu變量上並掛起進程時,我們後面分析就會證實這一點。
(2) batch:表示一個graceperiodid,表示本次被阻塞的寫者,需要在哪個graceperiod之後被激活;
(3) quiescbatch:表示一個graceperiodid,用來比較當前cpu是否處於等待進程切換完成。
下面我們介紹剩下的三組指針,但首先注意這三組鏈表上的元素都是structrcu_head類型。這三個鏈表的作用還要結合整個RCU寫者從阻塞到喚醒的過程來看。
rcu寫者的整體流程,假設系統中出現rcu寫者阻塞,那麼流程如下:
(1) 調用synchronize_rcu,將代表寫者的rcu_head添加到CPU[n]每cpu變量rcu_data的nxtlist,這個鏈表代表有需要提交給rcu處理的回調(但還沒有提交);
(2) CPU[n]每次時鐘中斷時檢測自己的nxtlist是否為null,若不為空,因此則喚醒rcu軟中斷;
(3) RCU的軟中斷處理函數rcu_process_callbacks會挨個檢查本CPU的三個鏈表。
下面分析RCU的核心處理函數,也就是軟中斷處理函數rcu_process_callbacks。
在“RCU API實現分析”一節我們知道調用synchronize_rcu,將代表寫者的rcu_head添加到了CPU[n]每cpu變量rcu_data的nxtlist。
而另一方面,在每次時鐘中斷中,都會調用update_process_times函數。
lupdate_process_times
void update_process_times(int user_tick)
{
//......
if (rcu_pending(cpu))
rcu_check_callbacks(cpu, user_tick);
//......
}
在每一次時鐘中斷,都會檢查rcu_pending(cpu)是否為真,如果是,就會為其調用rcu_check_callbacks()。
lrcu_pending
rcu_pending()的代碼如下:
int rcu_pending(int cpu)
{
return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
__rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
}
同上面一樣,忽略bh的部份,我們看__rcu_pending。
l__rcu_pending
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
/* This cpu has pending rcu entries and the grace period
* for them has completed.
*/
if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
return 1;
/* This cpu has no pending entries, but there are new entries */
if (!rdp->curlist && rdp->nxtlist)
return 1;
/* This cpu has finished callbacks to invoke */
if (rdp->donelist)
return 1;
/* The rcu core waits for a quiescent state from the cpu */
if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
return 1;
/* nothing to do */
return 0;
}
可以上面有四種情況會返回1,分別對應:
A.該CPU上有等待處理的回調函數,且已經經過了一個batch(grace period).rdp->datch表示rdp在等待的batch序號;
B.上一個等待已經處理完了,又有了新注冊的回調函數;
C.等待已經完成,但尚末調用該次等待的回調函數;
D.在等待quiescent state.
如果rcu_pending返回1,就會進入到rcu_check_callbacks(),rcu_check_callbacks()中主要工作就是調用raise_softirq(RCU_SOFTIRQ),觸發RCU軟中斷。而RCU軟中斷的處理函數為rcu_process_callbacks,其中分別針對每cpu變量rcu_bh_data和rcu_data調用__rcu_process_callbacks。我們主要分析針對rcu_data的調用。
l__rcu_process_callbacks
1) 先看nxtlist裡有沒有待處理的回調(rcu_head),如果有的話,說明有寫者待處理,那麼還要分兩種情況:
1.1)如果系統是第一次出現寫者阻塞,也即之前的寫者都已經處理完畢,那麼此時curlist鏈表一定為空(curlist專門存放已被rcu檢測到的寫者請求),於是就把nxtlist裡的所有成員都移動到curlist指向,並把當前CPU需要等待的graceperiodid:rdp->batch設置為當前系統處理的graceperiod的下一個grace周期,即rcp->cur+ 1。由於這算是一個新的graceperiod,即start_rcu_batch,所以還接著需要增加系統的graceperiod計數,即rcp->cur++,同時,將全局的cpusmask設置為全f,代表新的graceperiod開始,需要檢測所有的cpu是否都經過了一次進程切換。代碼如下:
void__rcu_process_callbacks(structrcu_ctrlblk*rcp,
structrcu_data*rdp)
{
if(rdp->nxtlist&&!rdp->curlist){
move_local_cpu_nxtlist_to_curlist();
rdp->batch=rcp->cur+1;
if(!rcp->next_pending){
rcp->next_pending=1;
rcp->cur++;
cpus_andnot(rcp->cpumask,cpu_online_map,nohz_cpu_mask);
}
}
}
接著跳轉至1.2。
1.2) 如果系統之前已經有寫者在被rcu監控著,但還沒來得及經過一個graceperiod,這個時候curlist不為空,nxtlist也不為空,寫者會被加入nxtlist中。由於curlist不為空,說明上個rcu周期的寫者還沒有處理完,於是不會將本次阻塞的寫者加入curlist,一直到上次的curlist裡的rcu_head被處理完(都移動到了donelist),才會將後來的寫者納入RCU考慮(移動到curlist)(假如這個期間又來了多個寫者,則多個寫者的rcu_head共享下一個graceperiod,也就是下一個graceperiod結束後這多個寫者都會被喚醒)。進入下一步。
2) rcu_process_callbacks調用每CPU函數rcu_check_quiescent_state開始監控,檢測所有的CPU是否會經歷一個進程切換。這個函數是如何得知需要開始監控的? 答案在於quiescbatch與全局的rcp->cur比較。 初始化時rdp->quiescbatch =rcp->completed = rcp->cur。 由於1.1有新graceperiod開啟,所以rcp->cur已經加1了,於是rdp->quiescbatch和rcp->curr不等,進而將此cpu的rdp->passed_quiesc設為0,表示這個周期開始,我要等待這個cpu經歷一個進程切換,等待該CPU將passed_quiesc置為1。即與前面講到的passed_quiesc標志置0的時機吻合。最後將rdp->quiescbatch置為 rcp->cur,以避免下次再進入軟中斷裡將passed_quiesc重復置0。
voidrcu_check_quiescent_state(structrcu_ctrlblk*rcp,
structrcu_data*rdp)
{
if(rdp->quiescbatch!=rcp->cur){
/*startnewgraceperiod:*/
rdp->qs_pending=1;
rdp->passed_quiesc=0;
rdp->quiescbatch=rcp->cur;
return;
}
}
3) 本次軟中斷結束,下次軟中斷到來,再次進入rcu_check_quiescent_state進行檢測,如果本CPU的rdp->passed_quiesc已經置1,則需要cpu_quiet將本CPU標志位從全局的rcp->cpumask中清除,如果cpumask為0了,則說明自上次RCU寫者被掛起以來,所有CPU都已經歷了一次進程切換,於是本次rcu等待周期結束,將rcp->completed置為rcp->cur,重置cpumask為全f,並嘗試重新開啟一個新的grace period。我們可以看到RCU用了如此多的同步標志,卻少用spinlock鎖,是多麼巧妙的設計,不過這也提高了理解的難度。
voidrcu_check_quiescent_state(structrcu_ctrlblk*rcp,
structrcu_data*rdp)
{
if(rdp->quiescbatch!=rcp->cur){
/*startnewgraceperiod:*/
rdp->qs_pending=1;
rdp->passed_quiesc=0;
rdp->quiescbatch=rcp->cur;
return;
}
if(!rdp->passed_quiesc)
return;
/*thiscpuhaspassedaquiesstate*/
if(likely(rdp->quiescbatch==rcp->cur)){
cpu_clear(cpu,rcp->cpumask);
if(cpus_empty(rcp->cpumask))
rcp->completed=rcp->cur;
}
}
4)下次再進入rcu軟中斷__rcu_process_callbacks,發現rdp->batch已經比rcp->completed小了(因為上一步驟中,後者增大了),則將rdp->curlist上的回調移動到rdp->donelist裡,接著還會再次進入rcu_check_quiescent_state,但是由於當前CPU的rdp->qs_pending已經為1了,所以不再往下清除cpu掩碼。__rcu_process_callbacks
代碼變成了:
void__rcu_process_callbacks(structrcu_ctrlblk*rcp,
structrcu_data*rdp)
{
if(rdp->curlist&&!rcu_batch_before(rcp->completed,rdp->batch)){
*rdp->donetail=rdp->curlist;
rdp->donetail=rdp->curtail;
rdp->curlist=NULL;
rdp->curtail=&rdp->curlist;
}
if(rdp->nxtlist&&!rdp->curlist){
move_local_cpu_nxtlist_to_curlist();
rdp->batch=rcp->cur+1;
if(!rcp->next_pending){
rcp->next_pending=1;
rcp->cur++;
cpus_andnot(rcp->cpumask,cpu_online_map,nohz_cpu_mask);
}
}
if(rdp->donelist)
rcu_do_batch(rdp);
}
5)經過千山萬水終於來到rcu_do_batch(如果rdp->donelist有的話)在此函數裡,執行RCU寫者掛載的回調,即wakeme_after_rcu。
lrcu_do_batch
static void rcu_do_batch(struct rcu_data *rdp)
{
struct rcu_head *next, *list;
int count = 0;
list = rdp->donelist;
while (list) {
next = list->next;
prefetch(next);
list->func(list);
list = next;
if (++count >= rdp->blimit)
break;
}
rdp->donelist = list;
local_irq_disable();
rdp->qlen -= count;
local_irq_enable();
if (rdp->blimit == INT_MAX && rdp->qlen)
blimit = blimit;
if (!rdp->donelist)
rdp->donetail = &rdp->donelist;
else
raise_rcu_softirq();
}
它遍歷處理掛在鏈表上的回調函數.在這裡,注意每次調用的回調函數有最大值限制.這樣做主要是防止一次調用過多的回調函數而產生不必要系統負載.如果donelist中還有沒處理完的數據,打開RCU軟中斷,在下次軟中斷到來的時候接著處理.
注意:
僅當系統檢測到一個grace period的所有CPU都經歷了進程切換後,才會給系統一個信息要求啟動新batch,在此期間的所有寫者請求,都暫存在本地CPU的nxtlist鏈表裡。
在每一次進程切換的時候,都會調用rcu_qsctr_inc().如下代碼片段如示:
asmlinkage void __sched schedule(void)
{
//......
rcu_qsctr_inc(cpu);
//......
}
rcu_qsctr_inc()代碼如下:
static inline void rcu_qsctr_inc(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
rdp->passed_quiesc = 1;
}
該函數將對應CPU上的rcu_data的passed_quiesc成員設為了1。或許你已經發現了,這個過程就標識該CPU經過了一次quiescent state,和之前在軟中斷的初始化為0相呼應。
1.如果CPU 1上有進程調用rcu_read_lock進入臨界區,之後退出來,發生了進程切換,新進程又通過rcu_read_lock進入臨界區.由於RCU軟中斷中只判斷一次上下文切換,因此,在調用回調函數的時候,仍然有進程處於RCU的讀臨界區,這樣會不會有問題呢?
這樣是不會有問題的.還是上面的例子:
spin_lock(&foo_mutex);
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a;
rcu_assign_pointer(gbl_foo, new_fp);
spin_unlock(&foo_mutex);
synchronize_rcu();
kfree(old_fp);
使用synchronize_rcu ()只是為了等待持有old_fd(也就是調用rcu_assign_pointer ()更新之前的gbl_foo)的進程退出.而不需要等待所有的讀者全部退出.這是因為,在rcu_assign_pointer ()之後的讀取取得的保護指針,已經是更新好的新值了.
2. 如果一個CPU連續調用synchronize_rcu()或者call_rcu()它們會有什麼影響呢?
如果當前有請求在等待,就會新請提交的回調函數掛到taillist上,一直到前一個等待完成,再將taillist的數據移到curlist,並開啟一個新的等待,因此,也就是說,在前一個等待期間提交的請求,都會放到一起處理.也就是說,他們會共同等待所有CPU切換完成.
舉例說明如下:
在上面的代碼分析的時候,經常看到帶有bh的RCU代碼.現在來看一下這些帶bh的RCU是什麼樣的。
#define rcu_read_lock_bh() __rcu_read_lock_bh()
#define rcu_read_unlock_bh() __rcu_read_unlock_bh()
#define __rcu_read_lock_bh()
do {
local_bh_disable();
__acquire(RCU_BH);
rcu_read_acquire();
}while (0)
#define __rcu_read_unlock_bh()
do {
rcu_read_release();
__release(RCU_BH);
local_bh_enable();
} while (0)
根據上面的分析:bh RCU跟普通的RCU相比不同的是,普通RCU是禁止內核搶占,而bh RCU是禁止下半部.
其實,帶bh的RCU一般在軟中斷使用,不過計算quiescent state並不是發生一次上下文切換。而是發生一次softirq.我們在後面的分析中可得到印證.
lcall_rcu_bh()
void call_rcu_bh(struct rcu_head *head,void (*func)(struct rcu_head *rcu))
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags);
rdp = &__get_cpu_var(rcu_bh_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
if (unlikely(++rdp->qlen > qhimark)) {
rdp->blimit = INT_MAX;
force_quiescent_state(rdp, &rcu_bh_ctrlblk);
}
local_irq_restore(flags);
}
它跟call_rcu()不相同的是,rcu是取per_cpu變量rcu__data和全局變量rcu_ctrlblk.而bh RCU是取rcu_bh_data,rcu_bh_ctrlblk.他們的類型都是一樣的,這樣做只是為了區分BH和普通RCU的等待.
對於rcu_bh_qsctr_inc
static inline void rcu_bh_qsctr_inc(int cpu)
{
struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
rdp->passed_quiesc = 1;
}
它跟rcu_qsctr_inc()機同,也是更改對應成員.所不同的是,調用rcu_bh_qsctr_inc()的地方發生了變化.
asmlinkage void __do_softirq(void)
{
//......
do {
if (pending & 1) {
h->action(h);
rcu_bh_qsctr_inc(cpu);
}
h++;
pending >>= 1;
} while (pending);
//......
}
也就是說,在發生軟中斷的時候,才會認為是經過了一次quiescent state.
為了操作鏈表,在include/linux/rculist.h有一套專門的RCU API。如:list_entry_rcu、list_add_rcu、list_del_rcu、list_for_each_entry_rcu等。即對所有kernel 的list的操作都有一個對應的RCU操作。那麼這些操作和原始的list操作有哪些不同呢?我們先對比幾個看下。
llist_entry_rcu
#define list_entry_rcu(ptr, type, member) \
container_of(rcu_dereference(ptr), type, member)
#define list_entry(ptr, type, member) \
container_of(ptr, type, member)
l__list_for_each_rcu
#define __list_for_each_rcu(pos, head) \
for (pos =rcu_dereference((head)->next); \
pos != (head); \
pos = rcu_dereference(pos->next))
#define __list_for_each(pos, head) \
for (pos =(head)->next; pos != (head); pos = pos->next)
從__list_for_each_rcu和list_entry_rcu的實現可以看出,其將指針的獲取替換為使用rcu_dereference。
llist_replace_rcu
static inline void list_replace_rcu(struct list_head *old,
struct list_head *new)
{
new->next = old->next;
new->prev = old->prev;
rcu_assign_pointer(new->prev->next, new);
new->next->prev = new;
old->prev = LIST_POISON2;
}
static inline void list_replace(struct list_head *old,
struct list_head *new)
{
new->next = old->next;
new->next->prev = new;
new->prev = old->prev;
new->prev->next = new;
}
從list_replace_rcu的實現可以看出,RCU API的實現將指針的賦值替換為rcu_assign_pointer。
llist_del_rcu
static inline void list_del_rcu(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
entry->prev = LIST_POISON2;
}
static inline void list_del(struct list_head *entry)
{
__list_del(entry->prev, entry->next);
entry->next = LIST_POISON1;
entry->prev = LIST_POISON2;
}
從list_del_rcu的實現,可以看出RCU API的實現沒有將刪除項的next指針置為無效。這樣實現是為了防止刪除節點時,讀者還在遍歷該節點。
下面看下RCU list API的幾個應用示例。
在這種應用情況下,絕大部分是對鏈表的遍歷,即讀操作,而很少出現的寫操作只有增加或刪除鏈表項,並沒有對鏈表項的修改操作,這種情況使用RCU非常容易,從rwlock轉換成RCU非常自然。路由表的維護就是這種情況的典型應用,對路由表的操作,絕大部分是路由表查詢,而對路由表的寫操作也僅僅是增加或刪除,因此使用RCU替換原來的rwlock順理成章。系統調用審計也是這樣的情況。
這是一段使用rwlock的系統調用審計部分的讀端代碼:
static enum audit_state audit_filter_task(struct task_struct *tsk)
{
struct audit_entry *e;
enum audit_state state;
read_lock(&auditsc_lock);
/* Note: audit_netlink_sem held by caller. */
list_for_each_entry(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
read_unlock(&auditsc_lock);
return state;
}
}
read_unlock(&auditsc_lock);
return AUDIT_BUILD_CONTEXT;
}
使用RCU後將變成:
static enum audit_state audit_filter_task(struct task_struct *tsk)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
/* Note: audit_netlink_sem held by caller. */
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
rcu_read_unlock();
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
這種轉換非常直接,使用rcu_read_lock和rcu_read_unlock分別替換read_lock和read_unlock,鏈表遍歷函數使用_rcu版本替換就可以了。
使用rwlock的寫端代碼:
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
write_lock(&auditsc_lock);
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del(&e->list);
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
write_lock(&auditsc_lock);
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add(&entry->list, list);
} else {
list_add_tail(&entry->list, list);
}
write_unlock(&auditsc_lock);
return 0;
}
使用RCU後寫端代碼變成為:
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* Do not use the _rcu iterator here, since this is the only
* deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del_rcu(&e->list);
call_rcu(&e->rcu, audit_free_rule, e);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add_rcu(&entry->list, list);
} else {
list_add_tail_rcu(&entry->list, list);
}
return 0;
}
對於鏈表刪除操作,list_del替換為list_del_rcu和call_rcu,這是因為被刪除的鏈表項可能還在被別的讀者引用,所以不能立即刪除,必須等到所有讀者經歷一個quiescent state才可以刪除。另外,list_for_each_entry並沒有被替換為list_for_each_entry_rcu,這是因為,只有一個寫者在做鏈表刪除操作,因此沒有必要使用_rcu版本。
通常情況下,write_lock和write_unlock應當分別替換成spin_lock和spin_unlock,但是對於只是對鏈表進行增加和刪除操作而且只有一個寫者的寫端,在使用了_rcu版本的鏈表操作API後,rwlock可以完全消除,不需要spinlock來同步讀者的訪問。對於上面的例子,由於已經有audit_netlink_sem被調用者保持,所以spinlock就沒有必要了。
這種情況允許修改結果延後一定時間才可見,而且寫者對鏈表僅僅做增加和刪除操作,所以轉換成使用RCU非常容易。
如果寫者需要對鏈表條目進行修改,那麼就需要首先拷貝要修改的條目,然後修改條目的拷貝,等修改完畢後,再使用條目拷貝取代要修改的條目,要修改條目將被在經歷一個grace period後安全刪除。
對於系統調用審計代碼,並沒有這種情況。這裡假設有修改的情況,那麼使用rwlock的修改代碼應當如下:
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_newentry *ne;
write_lock(&auditsc_lock);
/* Note: audit_netlink_sem held by caller. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
e->rule.action = newaction;
e->rule.file_count = newfield_count;
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
如果使用RCU,修改代碼應當為;
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_newentry *ne;
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
if (ne == NULL)
return -ENOMEM;
audit_copy_rule(&ne->rule, &e->rule);
ne->rule.action = newaction;
ne->rule.file_count = newfield_count;
list_replace_rcu(e, ne);
call_rcu(&e->rcu, audit_free_rule, e);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
前面兩種情況,讀者能夠容忍修改可以在一段時間後看到,也就說讀者在修改後某一時間段內,仍然看到的是原來的數據。在很多情況下,讀者不能容忍看到舊的數據,這種情況下,需要使用一些新措施,如System V IPC,它在每一個鏈表條目中增加了一個deleted字段,標記該字段是否刪除,如果刪除了,就設置為真,否則設置為假,當代碼在遍歷鏈表時,核對每一個條目的deleted字段,如果為真,就認為它是不存在的。
還是以系統調用審計代碼為例,如果它不能容忍舊數據,那麼,讀端代碼應該修改為:
static enum audit_state audit_filter_task(struct task_struct *tsk)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
spin_lock(&e->lock);
if (e->deleted) {
spin_unlock(&e->lock);
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
rcu_read_unlock();
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
注意,對於這種情況,每一個鏈表條目都需要一個spinlock保護,因為刪除操作將修改條目的deleted標志。此外,該函數如果搜索到條目,返回時應當保持該條目的鎖。
寫端的刪除操作將變成:
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* Do not use the _rcu iterator here, since this is the only
* deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
spin_lock(&e->lock);
list_del_rcu(&e->list);
e->deleted = 1;
spin_unlock(&e->lock);
call_rcu(&e->rcu, audit_free_rule, e);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
刪除條目時,需要標記該條目為已刪除。這樣讀者就可以通過該標志立即得知條目是否已經刪除。
RCU是2.6內核引入的新的鎖機制,在絕大部分為讀而只有極少部分為寫的情況下,它是非常高效的,因此在路由表維護、系統調用審計、SELinux的AVC、dcache和IPC等代碼部分中,使用它來取代rwlock來獲得更高的性能。但是,它也有缺點,延後的刪除或釋放將占用一些內存,尤其是對嵌入式系統,這可能是非常昂貴的內存開銷。此外,寫者的開銷比較大,尤其是對於那些無法容忍舊數據的情況以及不只一個寫者的情況,寫者需要spinlock或其他的鎖機制來與其他寫者同步。
類似桌面背景壁紙隨手指滑動--第三方開源--BackgroundViewPager,viewpager滑動開源 Android BackgroundView
100多個Styles快速開發布局XML,一行搞定View屬性,一鍵統一配置UI...,stylesui.. Android開發中大量使用XML代碼作為界面的
AIDL使用中報錯找不到自定義數據類型的解決辦法,在研究Android多進程編程的時候,照書敲了一個AIDL的例子。其中,用Android Studio自動生成了AIDL
解決Android SDK Manager下載太慢問題 在極客頭條上看到的方法,Mark一下,以後可能經常用到。 1、打開android sdk manager 2、打開
Android——Listview不用notifydatasetchan