Kqueue和其他的多路復(fù)用IO的核心是,單消費(fèi)者同時(shí)監(jiān)聽不同種類的生產(chǎn)者,從而提供高性能的單線程IO,減少調(diào)度開銷。而Kqueue通過(guò)在內(nèi)核態(tài)維持狀態(tài)提供了更高的性能。
生產(chǎn)者消費(fèi)者模型

單Producer和單Consumer
生產(chǎn)者/消費(fèi)者模型是常見的通信模型,通過(guò)共享內(nèi)核緩沖區(qū)環(huán)形隊(duì)列,實(shí)現(xiàn)異步的事件通知。雙方只關(guān)注緩沖區(qū)內(nèi)的數(shù)據(jù),而不關(guān)注彼此,因此常常被用于網(wǎng)絡(luò)通信。
信號(hào)量
為了避免消費(fèi)者在緩存區(qū)未滿時(shí)無(wú)意義的輪詢,消費(fèi)者block直到生產(chǎn)者通知。wait時(shí)線程設(shè)置信號(hào)量并且block,notify時(shí)內(nèi)核通知所有等待信號(hào)的線程狀態(tài)改為RUNNABLE。
事實(shí)上就是linux的pthread_cond_wait和phread_cond_signal原語(yǔ)。consumer之所以要帶鎖wait,是因?yàn)樵趦?nèi)部進(jìn)行調(diào)度yield_wait前要放掉鎖,否則其他線程無(wú)法進(jìn)入臨界區(qū);喚醒之后重新獲得鎖。(這里指的鎖是外部事務(wù)的鎖)
wait和notify需要增加鎖,防止notify先于wait進(jìn)行。(這里的鎖指的是內(nèi)部事務(wù)的鎖)
wait調(diào)用的yield_wait在調(diào)度時(shí)需要臨時(shí)釋放并隨后獲取內(nèi)部事務(wù)鎖,否則會(huì)阻塞其他的notify造成全員block。
send(bb, msg):
acquire(bb.lock)
while True:
if bb.in - bb.out < N:
bb.buf[bb.in mod N] <- msg
bb.in <- bb.in + 1
release(bb.lock)
notify(bb.not_empty)
return
wait(bb.not_full, bb.block)
receive(bb):
acquire(bb.lock)
while True:
if bb.in > bb.out:
msg <- bb.buf[bb.out mod N]
bb.out <- bb.out + 1
release(bb.lock)
wait(bb.not_full)
return
wait(bb.not_empty, bb.block)
Eventcount & Sequencer
這是1979年提出的算法,作為信號(hào)量的可替換實(shí)現(xiàn)。Sequencer的目的是處理多producer。
semaphores
send(Buffer& buffer,Message msg) {
t=TICKET(T);
AWAIT(buffer.in, t);
AWAIT(buffer.out, READ(buffer.in)-N);
buffer[READ(buffer.in)%N]=msg;
ADVANCE(in);
}
receive(Buffer& buffer) {
AWAIT(buffer.in, READ(buffer.out));
msg = buffer[READ(buffer.out)%N];
ADVANCE(buffer.out);
return msg;
}
- AWAIT(event*,val) - 比較event.count和val,如果大于則返回,否則存入線程TCB并yield
- ADVANCE(event*) - 自增event.count并將所有同event且event.count>val的線程喚醒
- TICKET(sequencer*) - 原子性自增序號(hào),目的是處理并發(fā)的sender
- READ(event*) - 原子性讀event.count,因?yàn)榭赡茏x操作涉及多memory cell
send等待in超過(guò)ticket,相當(dāng)于拿排隊(duì)鎖輪到自己。然后等待緩存區(qū)未滿時(shí)寫入數(shù)據(jù)。
receive等待緩沖區(qū)存在數(shù)據(jù)時(shí)讀取數(shù)據(jù)。
Kqueue
https://people.freebsd.org/~jlemon/papers/kqueue.pdf
問(wèn)題在于,上面提到的做法本質(zhì)上都是監(jiān)聽著一個(gè)事件,如果我們想要處理多個(gè)監(jiān)聽事件,操作系統(tǒng)必須提供新的原語(yǔ),例如每個(gè)socket都對(duì)應(yīng)著一個(gè)file descriptor,需要同時(shí)監(jiān)聽所有socket的事件。BSD的Kqueue和Linux的epoll都是解決這種問(wèn)題的方式,本質(zhì)上它們就是IPC,但是單純從IO的角度看叫做多路復(fù)用IO。目前epoll用于netty的底層,是單線程實(shí)現(xiàn)高性能網(wǎng)絡(luò)的關(guān)鍵。
傳統(tǒng)的select和poll僅僅適用于file descriptor,但是無(wú)法關(guān)注其他IPC機(jī)制,例如信號(hào)、文件系統(tǒng)變化、異步IO完成、進(jìn)程存在;并且也不具備scalability。
第一個(gè)問(wèn)題在于參數(shù)傳遞,每次都必須傳遞整個(gè)事件組,并且動(dòng)態(tài)在內(nèi)核中創(chuàng)建和銷毀內(nèi)存。第二個(gè)問(wèn)題在于內(nèi)核必須遍歷整個(gè)fd列表去找活躍的fd。初始遍歷一次確定沒(méi)有active的fd才能沉睡,如果沒(méi)有active還要再遍歷一次設(shè)定回調(diào)來(lái)喚醒,最后喚醒時(shí)還要再遍歷一次來(lái)看是哪個(gè)fd喚醒了。
問(wèn)題出在這個(gè)syscall無(wú)狀態(tài)上,無(wú)法利用之前的信息,每次都得重新計(jì)算。因此Kqueue的機(jī)制就在于內(nèi)核中維持一個(gè)隊(duì)列儲(chǔ)存狀態(tài)。
int
kqueue(void);
int
kevent(int kq,const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);
struct kevent{
uintpt t ident; // 事件關(guān)注對(duì)象的ID,kq,ident,filter確定唯一的event
// 事件類型,ident,fflags,data應(yīng)該如何被解釋?
u short flags; // 輸入: 增加/減少,使能/禁止, 執(zhí)行后重置/刪除;輸出: 發(fā)生EOF或者ERROR
u int fflags; // 活躍時(shí)應(yīng)該怎么做,是否返回event?
intptr t data; // filter和fflags規(guī)定的數(shù)據(jù)傳輸方式
void *udata; // 自定義的數(shù)據(jù)傳輸方式
__uint64_t ext[4]; //在末尾增加的額外信息Hint
}
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
kevent()用于創(chuàng)建kqueue并且返回對(duì)應(yīng)的capability(權(quán)限控制的抽象)。
kevent()用于注冊(cè)event,并設(shè)定超時(shí),changelist是指kqueue注冊(cè)的event如何變化,eventlist則是返回的event。當(dāng)event觸發(fā)時(shí),會(huì)調(diào)用內(nèi)核的回調(diào)函數(shù),通知進(jìn)程。
filter
- EVFILT READ :poll近似的實(shí)現(xiàn),當(dāng)socket_buffer大于SO_LOWAT時(shí)觸發(fā)將size寫入data或者斷連時(shí)觸發(fā)EOF,幫助應(yīng)用處理數(shù)據(jù)。
- EVFILT WRITE: 類似READ
- EVFILT AIO: aio_read/write請(qǐng)求后通過(guò)事件進(jìn)行aio_error輪詢,事件返回后aio_return
- EVFILT SIGNAL: id為信號(hào)值,返回data為信號(hào)計(jì)數(shù),通知后clear
- EVFILT VNODE: 監(jiān)聽文件系統(tǒng)vnode,id為fd, fflags監(jiān)聽下列事件并返回所有發(fā)生事件
NOTE DELETE
NOTE WRITE
NOTE EXTEND
NOTE ATTRIB
NOTE LINK
NOTE RENAME
- EVFILT PROC:監(jiān)聽進(jìn)程狀態(tài),id為PID,fflags監(jiān)聽下列事件
NOTE EXIT/FORK/EXEC 監(jiān)聽exit,fork,execve等原語(yǔ)
NOTE TRACK 若父進(jìn)程設(shè)定為Track則fork后子進(jìn)程為CHILD
輸出:
NOTE CHILD 子進(jìn)程fork后設(shè)定child,并且父進(jìn)程id存入data
NOTE TRACKERR 無(wú)法添加子進(jìn)程事件,通常因?yàn)橘Y源限制
sample
handle_events()
{
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
n = kevent(kq, ch, nchanges,
evi, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
if (evi.filter == EVFILT_READ)
readable_fd(evi.ident);
if (evi.filter == EVFILT_WRITE)
writeable_fd(evi.ident);
}
...
}
update_fd(int fd, int action,int filter)
{
EV_SET(&chnchanges, fd, filter,action == ADD ?
EV_ADD : EV_DELETE,
0, 0, 0);
nchanges++;
}
Kqueue實(shí)現(xiàn)
Knote
- 計(jì)算當(dāng)前節(jié)點(diǎn)的活躍度
- 鏈接其他knote
- 存儲(chǔ)自己所在的Kqueue的指針
struct knote {
SLIST_ENTRY(knote) kn_link; /* for kq */
SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */
struct knlist *kn_knlist; /* f_attach populated */
TAILQ_ENTRY(knote) kn_tqe;
struct kqueue *kn_kq; /* which queue we are on */
struct kevent kn_kevent;
void *kn_hook;
int kn_hookid;
int kn_status; /* protected by kq lock */
#define KN_ACTIVE 0x01 /* event has been triggered */
#define KN_QUEUED 0x02 /* event is on queue */
#define KN_DISABLED 0x04 /* event is disabled */
#define KN_DETACHED 0x08 /* knote is detached */
#define KN_MARKER 0x20 /* ignore this knote */
#define KN_KQUEUE 0x40 /* this knote belongs to a kq */
#define KN_SCAN 0x100 /* flux set in kqueue_scan() */
int kn_influx;
int kn_sfflags; /* saved filter flags */
int64_t kn_sdata; /* saved data field */
union {
struct file *p_fp; /* file data pointer */
struct proc *p_proc; /* proc pointer */
struct kaiocb *p_aio; /* AIO job pointer */
struct aioliojob *p_lio; /* LIO job pointer */
void *p_v; /* generic other pointer */
} kn_ptr;
struct filterops *kn_fop;
#define kn_id kn_kevent.ident
#define kn_filter kn_kevent.filter
#define kn_flags kn_kevent.flags
#define kn_fflags kn_kevent.fflags
#define kn_data kn_kevent.data
#define kn_fp kn_ptr.p_fp
};
Kqueue
- kp_knlist存所有knode用于GC
- kp_head存存儲(chǔ)所有標(biāo)記為active的knode
- kq_knhash存儲(chǔ)iden->descriptor的映射
- kq_fdp fd索引的數(shù)組(同open file table)用于關(guān)閉fd時(shí)刪除對(duì)應(yīng)的knode
struct kqueue {
struct mtx kq_lock;
int kq_refcnt;
TAILQ_ENTRY(kqueue) kq_list;
TAILQ_HEAD(, knote) kq_head; /* list of pending event */
int kq_count; /* number of pending events */
struct selinfo kq_sel;
struct sigio *kq_sigio;
struct filedesc *kq_fdp;
int kq_state;
#define KQ_SEL 0x01
#define KQ_SLEEP 0x02
#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */
#define KQ_ASYNC 0x08
#define KQ_CLOSING 0x10
#define KQ_TASKSCHED 0x20 /* task scheduled */
#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */
int kq_knlistsize; /* size of knlist */
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
struct task kq_task;
struct ucred *kq_cred;
};
Registration
kqueue
kqueue本身作為文件抽象看待,在OFT里注冊(cè)entry創(chuàng)建內(nèi)核對(duì)象并賦予descriptor索引。hash和內(nèi)部的array并不分配。
kevent
int
kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents, const struct timespec *timeout)
{
return (((int (*)(int, const struct kevent *, int,
struct kevent *, int, const struct timespec *))
__libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges,
eventlist, nevents, timeout));
}
這里調(diào)用了kqueue_register來(lái)對(duì)changeList進(jìn)行注冊(cè)。首先根據(jù)線程和fd獲取文件的FCB,kq對(duì)于fp引用計(jì)數(shù)++,然后調(diào)用實(shí)際的注冊(cè)函數(shù)。注冊(cè)的代碼太長(zhǎng)了,大體就是先根據(jù)<Iden,filter>尋找knote節(jié)點(diǎn),找不到如果是EV_ADD則增加knote,否則把事件增加到knote上去。
int
kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag)
{
struct kqueue *kq;
struct file *fp;
cap_rights_t rights;
int error;
error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp);
if (error != 0)
return (error);
if ((error = kqueue_acquire(fp, &kq)) != 0)
goto noacquire;
error = kqueue_register(kq, kev, td, mflag);
kqueue_release(kq, 0);
noacquire:
fdrop(fp, td);
return (error);
}
Filter
filter的作用就是對(duì)于事件源進(jìn)行過(guò)濾,事件源所有的活動(dòng)都會(huì)調(diào)用filter,但是只有符合filter規(guī)則的事件才會(huì)報(bào)告給應(yīng)用,也就是返回布爾值,同時(shí)他也會(huì)修改fflags和data產(chǎn)生副作用(上面提到的輸出參數(shù))。filter封裝了事件,kqueue只能詢問(wèn)他是否活躍,而對(duì)事件的細(xì)節(jié)一無(wú)所知。因此只需要增加filter,就能拓展事件的內(nèi)容。

Activity
在所有觸發(fā)這些活動(dòng)的地方插入hook函數(shù),調(diào)用knote()函數(shù)遍歷自己維護(hù)的klist(注冊(cè)的時(shí)候維護(hù)的),調(diào)用filter。
如果事件觸發(fā)則激活,通過(guò)knote找到其所屬的kqueue,并且將knote加入kqueue的active鏈末尾。如果已經(jīng)在了,那么不用增加knote,但是filter還是會(huì)記錄activity(e.g.上文提到的副作用)。
這里有些special case,例如fork需要看是不是TRACK,來(lái)判斷是否報(bào)告子節(jié)點(diǎn)的PID
Additionally, for each knote attached to the parent, check whether user wants to track the new process. If so, attach a new knote to it, and immediately report an event with the child's pid.
首先,激活父進(jìn)程的knote,然后創(chuàng)建新的knote分配給子節(jié)點(diǎn),并且設(shè)置CHILD flag和對(duì)應(yīng)的父進(jìn)程PID。同時(shí)這里還提到了可能存在事件可能改變data,因此為EXIT額外分配一個(gè)節(jié)點(diǎn)。
/*
* Activate existing knote and register tracking knotes with
* new process.
*
* First register a knote to get just the child notice. This
* must be a separate note from a potential NOTE_EXIT
* notification since both NOTE_CHILD and NOTE_EXIT are defined
* to use the data field (in conflicting ways).
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT |
EV_FLAG2;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
/*
* Then register another knote to track other potential events
* from the new process.
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
if (kn->kn_fop->f_event(kn, NOTE_FORK))
KNOTE_ACTIVATE(kn, 0);
list->kl_lock(list->kl_lockarg);
KQ_LOCK(kq);
kn_leave_flux(kn);
KQ_UNLOCK_FLUX(kq);
Delivery
kqueue_scan在active鏈末尾加入哨兵,如果scan時(shí)扔出了哨兵,那么遍歷結(jié)束。
每次都從active移除一個(gè)節(jié)點(diǎn)(注意檢查timeout,過(guò)期也要移除,DISABLE也是在這里移除),如果不是ONESHOP,那么filter帶著query hint重新檢查一遍是否激活,防止途中又被修改。
The rationale for this is the case where data arrives for a socket, which causes the knote to be queued, but the Application happens to call read() and empty the socket buffer before calling kevent. If the knote was still queued, then an event would be returned telling the application to read an empty buffer.
確認(rèn)激活的knote的信息將會(huì)拷貝到kevnet通過(guò)eventlist返回給應(yīng)用進(jìn)行通知。如果ONESHOP則直接從kqueue中移除,否則如果filter看它仍然active,就把它重新放到active鏈末尾(上次掃描的哨兵之后)。直到哨兵被出列,scan完成。
Miscellaneous Notes
1.論文的版本fork的時(shí)候不復(fù)制kqueue的df除非vfork。如果復(fù)制的話需要在fork時(shí)進(jìn)行整個(gè)kqueue復(fù)制或者標(biāo)記為COW。(現(xiàn)在不知道是不是這么做的)
2.kqueue是通過(guò)維護(hù)klist來(lái)對(duì)整條鏈涉及的所有進(jìn)程進(jìn)行通知的,而不是像poll或者select那樣在sellInfo持有pid。下面這段話看不懂了,沒(méi)看過(guò)poll不知道啥叫collision。
While this may be a natural outcome from the way knotes are implemented, it also means that the kqueue system is not susceptible to select collisions. As each knote is queued in the active list, only processes sleeping on that kqueue are woken up
3.考慮同一個(gè)klist有不同類型的filter,調(diào)用knode時(shí)應(yīng)該給予額外信息通知他到底是什么事件觸發(fā)的(例如PROC和SIGNAL容易混淆),因此利用hint確定activity和哪個(gè)相關(guān)
4. kevent要經(jīng)歷兩次拷貝,增加了overhead。因此如果采用AIO更好,kernel直接修改user狀態(tài)下的control block。那么為什么不這么做呢?根本原因在于如果內(nèi)核不允許直接寫用戶態(tài)數(shù)據(jù)的話,bug會(huì)更好定位,同時(shí)應(yīng)用也不需要考慮狀態(tài)。
總結(jié)
精妙之處在于kqueue維持在內(nèi)核中,因此socket如果滿了可以直接將knote加入進(jìn)程kqueue的活躍鏈,而不需要等到下次syscall的時(shí)候再檢查。例如,即使我長(zhǎng)期不kevent,knote()依然會(huì)將他們的activity存儲(chǔ)在knote上并且插入active list,下次只需要遍歷active list而不需要重頭遍歷整個(gè)queue。
同時(shí)因?yàn)閗queue有狀態(tài),進(jìn)行修改也開銷很小,只需要改變變化的那部分就行了。
看的時(shí)候還是有些地方比較難理解,加上源代碼也很復(fù)雜,如果有糾錯(cuò)請(qǐng)指正。
附錄
filechange
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, fd, EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_CLEAR,
NOTE_RENAME | NOTE_WRITE |
NOTE_DELETE | NOTE_ATTRIB, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0) {
printf("The file was");
if (ev.fflags & NOTE_RENAME)
printf(" renamed");
if (ev.fflags & NOTE_WRITE)
printf(" written");
if (ev.fflags & NOTE_DELETE)
printf(" deleted");
if (ev.fflags & NOTE_ATTRIB)
printf(" chmod/chowned");
printf("n");
}
signal
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, SIGHUP, EVFILT_SIGNAL,
EV_ADD | EV_ENABLE, 0, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
signal(SIGHUP, SIG_IGN);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0)
printf("signal %d delivered"
" %d timesn",
ev.ident, ev.data);
}
udata
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
void (* fcn)(struct kevent *);
n = kevent(kq, ch, nchanges,
ev, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
fcn = evi.udata;
fcn(&evi);
}