本文共 7692 字,大约阅读时间需要 25 分钟。
redis服务器是一个事件驱动的程序,内部需要处理两类事件,一类是文件事件(file event),一类是时间事件(time event),前者对应着处理各种io事件,后者对应着处理各种定时任务。
印象中我们都知道redis是单线程服务,其实本质上的确就是这样子的,上面说的file event 和 time event都是由单个线程驱动的,file event 底层其实是通过select/epoll等模型去执行驱动的,time event是通过内部维持一个定时任务列表来实现的。 redis server的事件模型其实就是经典的NIO模型,底层通过select/epoll等机制实现异步NIO,通过检测到event到来后for循环实现串行处理。redis server的main函数在启动过程中按照以下三个步骤进行初始化并进入运行状态:
int main(int argc, char **argv) { // 初始化服务器 initServerConfig(); // 载入配置文件, options 是前面分析出的给定选项 loadServerConfig(configfile,options); // 创建并初始化服务器数据结构 initServer(); // 运行事件处理器,一直到服务器关闭为止 aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); // 服务器关闭,停止事件循环 aeDeleteEventLoop(server.el); return 0;}
整个server初始化过程删除了不相关的代码保留跟这部分文章相关的内容,整个初始化过程核心逻辑如下:
void initServer() { // 打开 TCP 监听端口,用于等待客户端的命令请求 if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); // 为 serverCron() 创建时间事件 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); } // 为 TCP 连接关联连接应答(accept)处理器,用于接受并应答客户端的 connect() 调用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } }}
aeCreateFileEvent方法内部关键点在于aeApiAddEvent这个方法,将fd绑定到具体的eventLoop当中
/* * 根据 mask 参数的值,监听 fd 文件的状态, * 当 fd 可用时,执行 proc 函数 */int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData){ if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } if (fd >= eventLoop->setsize) return AE_ERR; // 取出文件事件结构 aeFileEvent *fe = &eventLoop->events[fd]; // 监听指定 fd 的指定事件 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; // 设置文件事件类型,以及事件的处理器 fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; // 私有数据 fe->clientData = clientData; // 如果有需要,更新事件处理器的最大 fd if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK;}
整个accept的处理过程按照anetTcpAccept -> acceptCommonHandler -> createClient -> aeCreateFileEvent实现socket的accept并注册到eventLoop。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { while(max--) { // accept 客户端连接 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); acceptCommonHandler(cfd,0); }} static void acceptCommonHandler(int fd, int flags) { // 创建客户端 redisClient *c; if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; }}redisClient *createClient(int fd) { // 分配空间 redisClient *c = zmalloc(sizeof(redisClient)); // 当 fd 不为 -1 时,创建带网络连接的客户端 // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端 // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时 // 需要用到这种伪终端 if (fd != -1) { // 绑定读事件到事件 loop (开始接收命令请求) if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } }//此处省略很多代码}
redis server启动事件处理的主循环,会循环执行aeProcessEvents方法。
/* * 事件处理器的主循环 */void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 如果有需要在事件处理前执行的函数,那么运行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 开始处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); }}
aeProcessEvents方法有一个非常巧妙的实现,在内部需要同时处理time event 和 file event,那么如何处理先后问题呢。思路如下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags){ int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; // 获取最近的时间事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 如果时间事件存在的话 // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ // 计算距今最近的时间事件还要多久才能达到 // 并将该时间距保存在 tv 结构中 aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞) if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // 执行到这一步,说明没有时间事件 // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度 /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { // 设置文件事件不阻塞 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 文件事件可以阻塞直到有事件到达为止 tvp = NULL; /* wait forever */ } } // 处理文件事件,阻塞时间由 tvp 决定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 从已就绪数组中获取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ // 执行时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */}
转载地址:http://oxpmx.baihongyu.com/