typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); //時間事件處理接口(函數指針),該函數返回定時的時長 typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); //aeMain中使用,在調用處理事件前調用 typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
//文件事件結構體 typedef struct aeFileEvent { //讀或者寫,也用於標識該事件結構體是否正在使用 int mask; /* one of AE_(READABLE|WRITABLE) */ //讀事件的處理函數 aeFileProc *rfileProc; //寫事件的處理函數 aeFileProc *wfileProc; //傳遞給上述兩個函數的數據 void *clientData; } aeFileEvent; //時間事件 typedef struct aeTimeEvent { //時間事件標識符,用於唯一標識該時間事件,並且用於刪除時間事件 long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ //該事件對應的處理程序 aeTimeProc *timeProc; //時間事件的最後一次處理程序,若已設置,則刪除時間事件時會被調用 aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent; //這裡用於保存已觸發的事件 typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent;
/* State of an event based program */ typedef struct aeEventLoop { //最大文件描述符的值 int maxfd; /* highest file descriptor currently registered */ //文件描述符的最大監聽數 int setsize; /* max number of file descriptors tracked */ //用於生成時間事件的唯一標識id long long timeEventNextId; //用於檢測系統時間是否變更(判斷標准 now
typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState;
//ae底層的數據創建以及初始化 static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; //創建setsize個epoll_event state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; } //創建事件循環,setsize為最大事件的的個數,對於epoll來說也是epoll_event的個數 aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; //分配該結構體的內存空間 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; //初始化最多setsize個事件 eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; //這一步為創建底層IO處理的數據,如epoll,創建epoll_event,和epfd if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
//添加監聽的事件,其中如果該fd對應的事件已經存在,則為修改合並舊的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ //判斷fd是否已經添加了事件的監聽 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; } //刪除指定事件的監聽 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ int mask = eventLoop->events[fd].mask & (~delmask); ee.events = 0; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (mask != AE_NONE) { epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); } else { /* Note, Kernel < 2.6.9 requires a non null event pointer even for * EPOLL_CTL_DEL. */ epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); } } //創建文件事件,並將該事件注冊到eventLoop中 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } //直接使用fd來獲取FileEvent,來後面分離事件時也采用這種方法(直接索引) aeFileEvent *fe = &eventLoop->events[fd]; //該該事件添加eventLoop中或者修改原來的已有的(保留舊的) if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; //將該事件的處理程序放到對應的位置 if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; //設置將要傳遞給該事件處理程序的數據 fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; //等待事件產生 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; // 利用fired數組記錄觸發的事件 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; } //事件處理程序 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; //若什麼都沒有設置,則直接返回 /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; //如果有文件事件或者設置了時間事件並且沒有設置DONT_WAIT標志 /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) //查找時間最早的時間事件 shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; // 找到最早的時間事件與當前時間差值就是epoll wait時間 if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { //如果沒有時間事件則可以阻塞、如果此時加入一個Timer event,啥時候喚醒呢?! /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { //這裡的判斷是為了防止重復調用 if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
