您的位置 首页 > 腾讯云社区

redis0.1源码解析之事件驱动---theanarkh

redis的事件驱动模块负责处理文件和定时器两种任务。 下面是几个函数指针

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

下面是文件相关的结构体

struct aeFileEvent { int fd; int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */ aeFileProc *fileProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeFileEvent *next; } aeFileEvent;

下面是定时器结构体

struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;

下面是事件循环的核心结构体

struct aeEventLoop { // 定时器id,每创建一个定时器结构体,就加一 long long timeEventNextId; // 两个链表 aeFileEvent *fileEventHead; aeTimeEvent *timeEventHead; int stop; } aeEventLoop;

下面先看一下一些基础函数,然后再分析具体流程。

1 创建一个事件循环结构体aeEventLoop *aeCreateEventLoop(void) { aeEventLoop *eventLoop; eventLoop = zmalloc(sizeof(*eventLoop)); if (!eventLoop) return NULL; eventLoop->fileEventHead = NULL; eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; return eventLoop; }2 文件相关函数

2.1 新建一个文件相关的结构体

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { aeFileEvent *fe; fe = zmalloc(sizeof(*fe)); if (fe == NULL) return AE_ERR; fe->fd = fd; fe->mask = mask; fe->fileProc = proc; fe->finalizerProc = finalizerProc; fe->clientData = clientData; fe->next = eventLoop->fileEventHead; eventLoop->fileEventHead = fe; return AE_OK; }

2.2 删除一个文件相关的结构体

// 删除某个节点(fd和mask等于入参的节点) void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { aeFileEvent *fe, *prev = NULL; fe = eventLoop->fileEventHead; while(fe) { if (fe->fd == fd && fe->mask == mask) { // 说明待删除的节点是第一个节点,直接修改头节点的指针 if (prev == NULL) eventLoop->fileEventHead = fe->next; else // 修改prev节点的next指针指向当前删除节点的下一个节点 prev->next = fe->next; // 钩子函数 if (fe->finalizerProc) fe->finalizerProc(eventLoop, fe->clientData); // 释放待删除节点的内存 zfree(fe); return; } // 记录上一个节点,当找到待删除节点时,修改prev指针的next指针(如果prev非空)为待删除节点的下一个节点 prev = fe; fe = fe->next; } }3 定时器相关函数long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; // 头插法插入eventLoop的timeEventHead队列 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }

3.1 时间相关的函数

// 获取当前时间,秒和毫秒 static void aeGetTime(long *seconds, long *milliseconds) { struct timeval tv; gettimeofday(&tv, NULL); *seconds = tv.tv_sec; *milliseconds = tv.tv_usec/1000; } static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) { long cur_sec, cur_ms, when_sec, when_ms; // 获取 aeGetTime(&cur_sec, &cur_ms); // 绝对时间,秒数 when_sec = cur_sec + milliseconds/1000; // 绝对时间,毫秒数 when_ms = cur_ms + milliseconds%1000; // 大于一秒则进位到秒中 if (when_ms >= 1000) { when_sec ++; when_ms -= 1000; } // 返回绝对时间的秒和毫秒 *sec = when_sec; *ms = when_ms; }

3.2 删除一个定时器结构体(参考删除文件相关数据结构的函数)

// 删除一个timeEvent节点 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) { aeTimeEvent *te, *prev = NULL; te = eventLoop->timeEventHead; while(te) { if (te->id == id) { if (prev == NULL) eventLoop->timeEventHead = te->next; else prev->next = te->next; if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData); zfree(te); return AE_OK; } prev = te; te = te->next; } return AE_ERR; /* NO event with the specified ID found */ }

3.3 查找最快到期的定时器节点

// 找出最快到期的节点 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; while(te) { /* nearest记录当前最快到期的节点,初始化为NULL 1 nearest为空,把当前节点作为最小值 2 when_sec小的作为最小值 3 when_sec一样的情况下,when_ms小者为最小值 */ if (!nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; } return nearest; }

最后来看一下事件处理的逻辑,入口函数是

void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) aeProcessEvents(eventLoop, AE_ALL_EVENTS); }

该函数由redis初始化时,main函数调用。这个版本使用的多路复用函数是select

int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int maxfd = 0, numfd = 0, processed = 0; fd_set rfds, wfds, efds; aeFileEvent *fe = eventLoop->fileEventHead; aeTimeEvent *te; long long maxId; AE_NOTUSED(flags); // 两种类型的事件都不需要处理 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; // 初始化select对应的结构体,读,写,异常三种事件 FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); // 处理文件事件 if (flags & AE_FILE_EVENTS) { while (fe != NULL) { // 根据需要处理的事件,设置对应的变量对应的位 if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds); if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds); if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds); // 记录最大文件描述符select的时候需要用 if (maxfd < fe->fd) maxfd = fe->fd; // 标记是否有文件事件 numfd++; fe = fe->next; } } // 有文件事件需要处理,或者有time事件并且没有设置AE_DONT_WAIT(设置的话就不会进入select定时阻塞)标记 if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int retval; aeTimeEvent *shortest = NULL; /* struct timeval { long tv_sec; // seconds long tv_usec; // and microseconds }; */ struct timeval tv, *tvp; // 有time事件需要处理,并且没有设置AE_DONT_WAIT标记,则select可能会定时阻塞(如果有time节点的话) if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 找出最快到期的节点 shortest = aeSearchNearestTimer(eventLoop); // 有待到期的time节点 if (shortest) { long now_sec, now_ms; // 获取当前时间 aeGetTime(&now_sec, &now_ms); tvp = &tv; // 算出相对时间,秒数 tvp->tv_sec = shortest->when_sec - now_sec; // 不够,需要借位 if (shortest->when_ms < now_ms) { // 微秒 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; // 借一位,减一 tvp->tv_sec --; } else { // 乘以1000,即微秒 tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } } else { // 没有到期的time节点 // 设置了AE_DONT_WAIT,则不会阻塞在select if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { // 一直阻塞直到有事件发生 tvp = NULL; /* wait forever */ } } retval = select(maxfd+1, &rfds, &wfds, &efds, tvp); if (retval > 0) { fe = eventLoop->fileEventHead; while(fe != NULL) { int fd = (int) fe->fd; // 有感兴趣的事件发生 if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) || (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) || (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))) { int mask = 0; // 记录发生了哪些感兴趣的事件 if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) mask |= AE_WRITABLE; if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)) mask |= AE_EXCEPTION; // 执行回调 fe->fileProc(eventLoop, fe->fd, fe->clientData, mask); processed++; /* After an event is processed our file event list * may no longer be the same, so what we do * is to clear the bit for this file descriptor and * restart again from the head. */ /* 执行完回调后,文件事件队列可能发生了变化, 重新开始遍历 */ fe = eventLoop->fileEventHead; // 清除该文件描述符 FD_CLR(fd, &rfds); FD_CLR(fd, &wfds); FD_CLR(fd, &efds); } else { fe = fe->next; } } } } // 处理time事件 if (flags & AE_TIME_EVENTS) { te = eventLoop->timeEventHead; // 先保存这次需要处理的最大id,防止在time回调了不断给队列新增节点,导致死循环 maxId = eventLoop->timeEventNextId-1; while(te) { long now_sec, now_ms; long long id; // 在本次回调里新增的节点,跳过 if (te->id > maxId) { te = te->next; continue; } // 获取当前时间 aeGetTime(&now_sec, &now_ms); // 到期了 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; // 执行回调 retval = te->timeProc(eventLoop, id, te->clientData); // 是否需要继续注册事件,是则修改超时时间,否则删除该节点 if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } } } // 处理的事件个数 return processed; /* return the number of processed file/time events */ } ---来自腾讯云社区的---theanarkh

关于作者: 瞎采新闻

这里可以显示个人介绍!这里可以显示个人介绍!

热门文章

留言与评论(共有 0 条评论)
   
验证码: