日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

Kqueue和其他的多路復(fù)用IO的核心是,單消費(fèi)者同時(shí)監(jiān)聽不同種類的生產(chǎn)者,從而提供高性能的單線程IO,減少調(diào)度開銷。而Kqueue通過(guò)在內(nèi)核態(tài)維持狀態(tài)提供了更高的性能。


生產(chǎn)者消費(fèi)者模型

單線程性能秒殺多線程!多路復(fù)用IO實(shí)現(xiàn)高性能網(wǎng)絡(luò)服務(wù)

 

單Producer和單Consumer

生產(chǎn)者/消費(fèi)者模型是常見的通信模型,通過(guò)共享內(nèi)核緩沖區(qū)環(huán)形隊(duì)列,實(shí)現(xiàn)異步的事件通知。雙方只關(guān)注緩沖區(qū)內(nèi)的數(shù)據(jù),而不關(guān)注彼此,因此常常被用于網(wǎng)絡(luò)通信。

信號(hào)量

為了避免消費(fèi)者在緩存區(qū)未滿時(shí)無(wú)意義的輪詢,消費(fèi)者block直到生產(chǎn)者通知。wait時(shí)線程設(shè)置信號(hào)量并且block,notify時(shí)內(nèi)核通知所有等待信號(hào)的線程狀態(tài)改為RUNNABLE。

事實(shí)上就是linux的pthread_cond_wait和phread_cond_signal原語(yǔ)。consumer之所以要帶鎖wait,是因?yàn)樵趦?nèi)部進(jìn)行調(diào)度yield_wait前要放掉鎖,否則其他線程無(wú)法進(jìn)入臨界區(qū);喚醒之后重新獲得鎖。(這里指的鎖是外部事務(wù)的鎖)

wait和notify需要增加鎖,防止notify先于wait進(jìn)行。(這里的鎖指的是內(nèi)部事務(wù)的鎖)

wait調(diào)用的yield_wait在調(diào)度時(shí)需要臨時(shí)釋放并隨后獲取內(nèi)部事務(wù)鎖,否則會(huì)阻塞其他的notify造成全員block。

 send(bb, msg):
    acquire(bb.lock)
    while True:
      if bb.in - bb.out < N:
        bb.buf[bb.in mod N] <- msg
        bb.in <- bb.in + 1
        release(bb.lock)
        notify(bb.not_empty)
        return
    wait(bb.not_full, bb.block)

 receive(bb):
    acquire(bb.lock)
    while True:
      if bb.in > bb.out:
        msg <- bb.buf[bb.out mod N]
        bb.out <- bb.out + 1
        release(bb.lock)
        wait(bb.not_full)
        return
    wait(bb.not_empty, bb.block)

Eventcount & Sequencer

這是1979年提出的算法,作為信號(hào)量的可替換實(shí)現(xiàn)。Sequencer的目的是處理多producer。

semaphores

send(Buffer& buffer,Message msg) {
  t=TICKET(T);
  AWAIT(buffer.in, t);
  AWAIT(buffer.out, READ(buffer.in)-N);
  buffer[READ(buffer.in)%N]=msg;
  ADVANCE(in);
}

receive(Buffer& buffer) {
 AWAIT(buffer.in, READ(buffer.out));
 msg = buffer[READ(buffer.out)%N];
 ADVANCE(buffer.out);
 return msg;
}
  • AWAIT(event*,val) - 比較event.count和val,如果大于則返回,否則存入線程TCB并yield
  • ADVANCE(event*) - 自增event.count并將所有同event且event.count>val的線程喚醒
  • TICKET(sequencer*) - 原子性自增序號(hào),目的是處理并發(fā)的sender
  • READ(event*) - 原子性讀event.count,因?yàn)榭赡茏x操作涉及多memory cell

send等待in超過(guò)ticket,相當(dāng)于拿排隊(duì)鎖輪到自己。然后等待緩存區(qū)未滿時(shí)寫入數(shù)據(jù)。

receive等待緩沖區(qū)存在數(shù)據(jù)時(shí)讀取數(shù)據(jù)。


Kqueue

https://people.freebsd.org/~jlemon/papers/kqueue.pdf

問(wèn)題在于,上面提到的做法本質(zhì)上都是監(jiān)聽著一個(gè)事件,如果我們想要處理多個(gè)監(jiān)聽事件,操作系統(tǒng)必須提供新的原語(yǔ),例如每個(gè)socket都對(duì)應(yīng)著一個(gè)file descriptor,需要同時(shí)監(jiān)聽所有socket的事件。BSD的Kqueue和Linux的epoll都是解決這種問(wèn)題的方式,本質(zhì)上它們就是IPC,但是單純從IO的角度看叫做多路復(fù)用IO。目前epoll用于netty的底層,是單線程實(shí)現(xiàn)高性能網(wǎng)絡(luò)的關(guān)鍵。

傳統(tǒng)的select和poll僅僅適用于file descriptor,但是無(wú)法關(guān)注其他IPC機(jī)制,例如信號(hào)、文件系統(tǒng)變化、異步IO完成、進(jìn)程存在;并且也不具備scalability。

第一個(gè)問(wèn)題在于參數(shù)傳遞,每次都必須傳遞整個(gè)事件組,并且動(dòng)態(tài)在內(nèi)核中創(chuàng)建和銷毀內(nèi)存。第二個(gè)問(wèn)題在于內(nèi)核必須遍歷整個(gè)fd列表去找活躍的fd。初始遍歷一次確定沒(méi)有active的fd才能沉睡,如果沒(méi)有active還要再遍歷一次設(shè)定回調(diào)來(lái)喚醒,最后喚醒時(shí)還要再遍歷一次來(lái)看是哪個(gè)fd喚醒了。

問(wèn)題出在這個(gè)syscall無(wú)狀態(tài)上,無(wú)法利用之前的信息,每次都得重新計(jì)算。因此Kqueue的機(jī)制就在于內(nèi)核中維持一個(gè)隊(duì)列儲(chǔ)存狀態(tài)。

int
kqueue(void);
int
kevent(int kq,const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);

struct kevent{  
uintpt t ident; // 事件關(guān)注對(duì)象的ID,kq,ident,filter確定唯一的event
 // 事件類型,ident,fflags,data應(yīng)該如何被解釋?
u short flags; // 輸入: 增加/減少,使能/禁止, 執(zhí)行后重置/刪除;輸出: 發(fā)生EOF或者ERROR
u int fflags; // 活躍時(shí)應(yīng)該怎么做,是否返回event?
intptr t data; // filter和fflags規(guī)定的數(shù)據(jù)傳輸方式
void *udata; // 自定義的數(shù)據(jù)傳輸方式
__uint64_t	ext[4]; //在末尾增加的額外信息Hint
}

EV_SET(&kev, ident, filter, flags, fflags, data, udata);

kevent()用于創(chuàng)建kqueue并且返回對(duì)應(yīng)的capability(權(quán)限控制的抽象)。

kevent()用于注冊(cè)event,并設(shè)定超時(shí),changelist是指kqueue注冊(cè)的event如何變化,eventlist則是返回的event。當(dāng)event觸發(fā)時(shí),會(huì)調(diào)用內(nèi)核的回調(diào)函數(shù),通知進(jìn)程。

filter

  • EVFILT READ :poll近似的實(shí)現(xiàn),當(dāng)socket_buffer大于SO_LOWAT時(shí)觸發(fā)將size寫入data或者斷連時(shí)觸發(fā)EOF,幫助應(yīng)用處理數(shù)據(jù)。
  • EVFILT WRITE: 類似READ
  • EVFILT AIO: aio_read/write請(qǐng)求后通過(guò)事件進(jìn)行aio_error輪詢,事件返回后aio_return
  • EVFILT SIGNAL: id為信號(hào)值,返回data為信號(hào)計(jì)數(shù),通知后clear
  • EVFILT VNODE: 監(jiān)聽文件系統(tǒng)vnode,id為fd, fflags監(jiān)聽下列事件并返回所有發(fā)生事件
NOTE DELETE
NOTE WRITE
NOTE EXTEND
NOTE ATTRIB
NOTE LINK
NOTE RENAME
  • EVFILT PROC:監(jiān)聽進(jìn)程狀態(tài),id為PID,fflags監(jiān)聽下列事件
NOTE EXIT/FORK/EXEC 監(jiān)聽exit,fork,execve等原語(yǔ)
NOTE TRACK 若父進(jìn)程設(shè)定為Track則fork后子進(jìn)程為CHILD
輸出:
NOTE CHILD 子進(jìn)程fork后設(shè)定child,并且父進(jìn)程id存入data
NOTE TRACKERR 無(wú)法添加子進(jìn)程事件,通常因?yàn)橘Y源限制

sample

handle_events()
{
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
n = kevent(kq, ch, nchanges,
evi, nevents, &timeout);
if (n <= 0)
   goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
if (evi.filter == EVFILT_READ)
readable_fd(evi.ident);
if (evi.filter == EVFILT_WRITE)
writeable_fd(evi.ident);
}
...
}
update_fd(int fd, int action,int filter)
{
EV_SET(&chnchanges, fd, filter,action == ADD ? 
EV_ADD : EV_DELETE,
0, 0, 0);
nchanges++;
}

Kqueue實(shí)現(xiàn)

Knote

  • 計(jì)算當(dāng)前節(jié)點(diǎn)的活躍度
  • 鏈接其他knote
  • 存儲(chǔ)自己所在的Kqueue的指針
struct knote {
	SLIST_ENTRY(knote)	kn_link;	/* for kq */
	SLIST_ENTRY(knote)	kn_selnext;	/* for struct selinfo */
	struct			knlist *kn_knlist;	/* f_attach populated */
	TAILQ_ENTRY(knote)	kn_tqe;
	struct			kqueue *kn_kq;	/* which queue we are on */
	struct 			kevent kn_kevent;
	void			*kn_hook;
	int			kn_hookid;
	int			kn_status;	/* protected by kq lock */
#define KN_ACTIVE	0x01			/* event has been triggered */
#define KN_QUEUED	0x02			/* event is on queue */
#define KN_DISABLED	0x04			/* event is disabled */
#define KN_DETACHED	0x08			/* knote is detached */
#define KN_MARKER	0x20			/* ignore this knote */
#define KN_KQUEUE	0x40			/* this knote belongs to a kq */
#define	KN_SCAN		0x100			/* flux set in kqueue_scan() */
	int			kn_influx;
	int			kn_sfflags;	/* saved filter flags */
	int64_t			kn_sdata;	/* saved data field */
	union {
		struct		file *p_fp;	/* file data pointer */
		struct		proc *p_proc;	/* proc pointer */
		struct		kaiocb *p_aio;	/* AIO job pointer */
		struct		aioliojob *p_lio;	/* LIO job pointer */
		void		*p_v;		/* generic other pointer */
	} kn_ptr;
	struct			filterops *kn_fop;

#define kn_id		kn_kevent.ident
#define kn_filter	kn_kevent.filter
#define kn_flags	kn_kevent.flags
#define kn_fflags	kn_kevent.fflags
#define kn_data		kn_kevent.data
#define kn_fp		kn_ptr.p_fp
};

Kqueue

  • kp_knlist存所有knode用于GC
  • kp_head存存儲(chǔ)所有標(biāo)記為active的knode
  • kq_knhash存儲(chǔ)iden->descriptor的映射
  • kq_fdp fd索引的數(shù)組(同open file table)用于關(guān)閉fd時(shí)刪除對(duì)應(yīng)的knode
struct kqueue {
	struct		mtx kq_lock;
	int		kq_refcnt;
	TAILQ_ENTRY(kqueue)	kq_list;
	TAILQ_HEAD(, knote)	kq_head;	/* list of pending event */
	int		kq_count;		/* number of pending events */
	struct		selinfo kq_sel;
	struct		sigio *kq_sigio;
	struct		filedesc *kq_fdp;
	int		kq_state;
#define KQ_SEL		0x01
#define KQ_SLEEP	0x02
#define KQ_FLUXWAIT	0x04			/* waiting for a in flux kn */
#define KQ_ASYNC	0x08
#define KQ_CLOSING	0x10
#define	KQ_TASKSCHED	0x20			/* task scheduled */
#define	KQ_TASKDRAIN	0x40			/* waiting for task to drain */
	int		kq_knlistsize;		/* size of knlist */
	struct		klist *kq_knlist;	/* list of knotes */
	u_long		kq_knhashmask;		/* size of knhash */
	struct		klist *kq_knhash;	/* hash table for knotes */
	struct		task kq_task;
	struct		ucred *kq_cred;
};

Registration

kqueue

kqueue本身作為文件抽象看待,在OFT里注冊(cè)entry創(chuàng)建內(nèi)核對(duì)象并賦予descriptor索引。hash和內(nèi)部的array并不分配。

kevent

int
kevent(int kq, const struct kevent *changelist, int nchanges,
    struct kevent *eventlist, int nevents, const struct timespec *timeout)
{

	return (((int (*)(int, const struct kevent *, int,
	    struct kevent *, int, const struct timespec *))
	    __libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges,
	   eventlist, nevents, timeout));
}

這里調(diào)用了kqueue_register來(lái)對(duì)changeList進(jìn)行注冊(cè)。首先根據(jù)線程和fd獲取文件的FCB,kq對(duì)于fp引用計(jì)數(shù)++,然后調(diào)用實(shí)際的注冊(cè)函數(shù)。注冊(cè)的代碼太長(zhǎng)了,大體就是先根據(jù)<Iden,filter>尋找knote節(jié)點(diǎn),找不到如果是EV_ADD則增加knote,否則把事件增加到knote上去。

int 
kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag)
{
	struct kqueue *kq;
	struct file *fp;
	cap_rights_t rights;
	int error;

	error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp);
	if (error != 0)
		return (error);
	if ((error = kqueue_acquire(fp, &kq)) != 0)
		goto noacquire;

	error = kqueue_register(kq, kev, td, mflag);
	kqueue_release(kq, 0);

noacquire:
	fdrop(fp, td);
	return (error);
}

Filter

filter的作用就是對(duì)于事件源進(jìn)行過(guò)濾,事件源所有的活動(dòng)都會(huì)調(diào)用filter,但是只有符合filter規(guī)則的事件才會(huì)報(bào)告給應(yīng)用,也就是返回布爾值,同時(shí)他也會(huì)修改fflags和data產(chǎn)生副作用(上面提到的輸出參數(shù))。filter封裝了事件,kqueue只能詢問(wèn)他是否活躍,而對(duì)事件的細(xì)節(jié)一無(wú)所知。因此只需要增加filter,就能拓展事件的內(nèi)容。

單線程性能秒殺多線程!多路復(fù)用IO實(shí)現(xiàn)高性能網(wǎng)絡(luò)服務(wù)

 

Activity

在所有觸發(fā)這些活動(dòng)的地方插入hook函數(shù),調(diào)用knote()函數(shù)遍歷自己維護(hù)的klist(注冊(cè)的時(shí)候維護(hù)的),調(diào)用filter。

如果事件觸發(fā)則激活,通過(guò)knote找到其所屬的kqueue,并且將knote加入kqueue的active鏈末尾。如果已經(jīng)在了,那么不用增加knote,但是filter還是會(huì)記錄activity(e.g.上文提到的副作用)。

這里有些special case,例如fork需要看是不是TRACK,來(lái)判斷是否報(bào)告子節(jié)點(diǎn)的PID

Additionally, for each knote attached to the parent, check whether user wants to track the new process. If so, attach a new knote to it, and immediately report an event with the child's pid.

首先,激活父進(jìn)程的knote,然后創(chuàng)建新的knote分配給子節(jié)點(diǎn),并且設(shè)置CHILD flag和對(duì)應(yīng)的父進(jìn)程PID。同時(shí)這里還提到了可能存在事件可能改變data,因此為EXIT額外分配一個(gè)節(jié)點(diǎn)。

		/*
		 * Activate existing knote and register tracking knotes with
		 * new process.
		 *
		 * First register a knote to get just the child notice. This
		 * must be a separate note from a potential NOTE_EXIT
		 * notification since both NOTE_CHILD and NOTE_EXIT are defined
		 * to use the data field (in conflicting ways).
		 */
		kev.ident = pid;
		kev.filter = kn->kn_filter;
		kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT |
		    EV_FLAG2;
		kev.fflags = kn->kn_sfflags;
		kev.data = kn->kn_id;		/* parent */
		kev.udata = kn->kn_kevent.udata;/* preserve udata */
		error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
		if (error)
			kn->kn_fflags |= NOTE_TRACKERR;

		/*
		 * Then register another knote to track other potential events
		 * from the new process.
		 */
		kev.ident = pid;
		kev.filter = kn->kn_filter;
		kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1;
		kev.fflags = kn->kn_sfflags;
		kev.data = kn->kn_id;		/* parent */
		kev.udata = kn->kn_kevent.udata;/* preserve udata */
		error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
		if (error)
			kn->kn_fflags |= NOTE_TRACKERR;
		if (kn->kn_fop->f_event(kn, NOTE_FORK))
			KNOTE_ACTIVATE(kn, 0);
		list->kl_lock(list->kl_lockarg);
		KQ_LOCK(kq);
		kn_leave_flux(kn);
		KQ_UNLOCK_FLUX(kq);

Delivery

kqueue_scan在active鏈末尾加入哨兵,如果scan時(shí)扔出了哨兵,那么遍歷結(jié)束。

每次都從active移除一個(gè)節(jié)點(diǎn)(注意檢查timeout,過(guò)期也要移除,DISABLE也是在這里移除),如果不是ONESHOP,那么filter帶著query hint重新檢查一遍是否激活,防止途中又被修改。

The rationale for this is the case where data arrives for a socket, which causes the knote to be queued, but the Application happens to call read() and empty the socket buffer before calling kevent. If the knote was still queued, then an event would be returned telling the application to read an empty buffer.

確認(rèn)激活的knote的信息將會(huì)拷貝到kevnet通過(guò)eventlist返回給應(yīng)用進(jìn)行通知。如果ONESHOP則直接從kqueue中移除,否則如果filter看它仍然active,就把它重新放到active鏈末尾(上次掃描的哨兵之后)。直到哨兵被出列,scan完成。

Miscellaneous Notes

1.論文的版本fork的時(shí)候不復(fù)制kqueue的df除非vfork。如果復(fù)制的話需要在fork時(shí)進(jìn)行整個(gè)kqueue復(fù)制或者標(biāo)記為COW。(現(xiàn)在不知道是不是這么做的)

2.kqueue是通過(guò)維護(hù)klist來(lái)對(duì)整條鏈涉及的所有進(jìn)程進(jìn)行通知的,而不是像poll或者select那樣在sellInfo持有pid。下面這段話看不懂了,沒(méi)看過(guò)poll不知道啥叫collision。

While this may be a natural outcome from the way knotes are implemented, it also means that the kqueue system is not susceptible to select collisions. As each knote is queued in the active list, only processes sleeping on that kqueue are woken up

3.考慮同一個(gè)klist有不同類型的filter,調(diào)用knode時(shí)應(yīng)該給予額外信息通知他到底是什么事件觸發(fā)的(例如PROC和SIGNAL容易混淆),因此利用hint確定activity和哪個(gè)相關(guān)

4. kevent要經(jīng)歷兩次拷貝,增加了overhead。因此如果采用AIO更好,kernel直接修改user狀態(tài)下的control block。那么為什么不這么做呢?根本原因在于如果內(nèi)核不允許直接寫用戶態(tài)數(shù)據(jù)的話,bug會(huì)更好定位,同時(shí)應(yīng)用也不需要考慮狀態(tài)。

總結(jié)

精妙之處在于kqueue維持在內(nèi)核中,因此socket如果滿了可以直接將knote加入進(jìn)程kqueue的活躍鏈,而不需要等到下次syscall的時(shí)候再檢查。例如,即使我長(zhǎng)期不kevent,knote()依然會(huì)將他們的activity存儲(chǔ)在knote上并且插入active list,下次只需要遍歷active list而不需要重頭遍歷整個(gè)queue。

同時(shí)因?yàn)閗queue有狀態(tài),進(jìn)行修改也開銷很小,只需要改變變化的那部分就行了。

看的時(shí)候還是有些地方比較難理解,加上源代碼也很復(fù)雜,如果有糾錯(cuò)請(qǐng)指正。

附錄

filechange

struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, fd, EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_CLEAR,
NOTE_RENAME | NOTE_WRITE |
NOTE_DELETE | NOTE_ATTRIB, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0) {
printf("The file was");
if (ev.fflags & NOTE_RENAME)
printf(" renamed");
if (ev.fflags & NOTE_WRITE)
printf(" written");
if (ev.fflags & NOTE_DELETE)
printf(" deleted");
if (ev.fflags & NOTE_ATTRIB)
printf(" chmod/chowned");
printf("n");
}

signal

struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, SIGHUP, EVFILT_SIGNAL,
EV_ADD | EV_ENABLE, 0, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
signal(SIGHUP, SIG_IGN);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0)
printf("signal %d delivered"
" %d timesn",
ev.ident, ev.data);
}

udata

int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
void (* fcn)(struct kevent *);
n = kevent(kq, ch, nchanges,
ev, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
fcn = evi.udata;
fcn(&evi);
}

分享到:
標(biāo)簽:單線程
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定