【阅读笔记】 redis 事件循环

redis 自己实现了一个事件库,用于处理网络事件和定时器事件,驱动 redis 主线程展开工作。本文通过源码分析 redis 的事件循环如何工作以及如何处理命令。

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct aeEventLoop {
int maxfd; /* 当前已注册的最大的文件描述符 */
int setsize; /* 最多可以保持多少个文件描述符 */
long long timeEventNextId; /* 下一个时间器事件的 ID */
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* 已注册的事件 */
aeFiredEvent *fired; /* 已触发的事件 */
aeTimeEvent *timeEventHead; /* 定时器事件,这是个链表的头部 */
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;

aeEventLoop 定义了事件循环的基本数据结构。aeEventLoop 有两个主要成员 events 和 fired,前进保存已经注册的事件,后者保存触发的事件。

1
2
3
4
5
6
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

aeFileEvent 定义了文件(网络)事件处理的函数 rfileProc,wfileProc,函数处理器的原型是

1
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

eventLoop 是事件循环自已, fd 是触发事件的文件描述符

1
2
3
4
5

typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

aeFiredEvent 定义已触发的事件,包含了触发事件的文件描述符和掩码(表示读还是写)

1
2
3
4
5
6
7
8
9
10
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;

aeTimeEvent 定义了定时器事件,主要成员是 when_sec, when_ms 等表示什么时候执行, timeProc 表示要执行的操作

基本用法

  1. 事件循环初始化

    1
    aeEventLoop *aeCreateEventLoop(int setsize)

    函数创建一个 aeEventLoop 结构,初始化各个成员,并根据传入的 setsize 参数给 events 和 fired 两个数组分配空间

  2. 注册事件

    1
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)

    函数向 eventLoop 注册一个事件,它修改 eventLoop->events[fd] 的成员实现事件的注册
    此外还会调用 aeApiAddEvent 方法注册 fd 到选择器(select 或 epoll)

  3. 注册定时器事件

    1
    2
    3
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
    aeTimeProc *proc, void *clientData,
    aeEventFinalizerProc *finalizerProc)

    函数向 eventLoop 注册一个定时器事件,它创建一个 aeTimeEvent 结构,计算 when_sec 和 when_ms (从当前时间加 milliseconds)
    然后插到 eventLoop->timeEventHead 的头部

  4. 事件循环处理

    1
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)

    函数实现事件循环的处理,它通过选择器获取网络事件,然后交给各个事件处理器去处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    ...

    // aeApiPoll 会将 eventLoop->fired 的前 numevents 个元素的 fd 成员设置为触发网络事件的网络套接字
    numevents = aeApiPoll(eventLoop, tvp);

    ...

    for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;

    // 一般情况下,先读后写,但也有些情况做相反的操作
    int invert = fe->mask & AE_BARRIER;

    // 处理读事件
    if (!invert && fe->mask & mask & AE_READABLE) {
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    fired++;
    }

    // 处理写事件
    if (fe->mask & mask & AE_WRITABLE) {
    if (!fired || fe->wfileProc != fe->rfileProc) {
    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    fired++;
    }
    }

    // 处理反向的读
    if (invert && fe->mask & mask & AE_READABLE) {
    if (!fired || fe->wfileProc != fe->rfileProc) {
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    fired++;
    }
    }
    }

    ...

    // 处理定时器事件,遍历 eventLoop->timeEventHead,找到适合的节点然后调用节点的 timeProc
    processTimeEvents(eventLoop);
    }
  5. 事件循环
    void aeMain(aeEventLoop *eventLoop)
    函数 aeMain 实现事件循环, redis 主线程在完成初始化后,即调用 aeMain 进入事件循环

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
    // 调用 beforeSleep 函数
    if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);

    aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
    }

redis 命令执行过程

redis 只有一个主线程处理大部分的事件(如来自客户端的命令),主线程在完成初始化后就就进入事件循环

  1. 网络侦听
    redis 初始化时需要绑定并侦听端口,并在事件循环中注册相应的事件处理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

    // server.c

    void initServer(void) {
    ...

    // 创建并初始化 aeEventLoop 数据结构
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    ...

    // ipfd 是个数组,所有侦听返回的 fd 都保存在这个数组,数组长度是 server.ipfd_count
    listenToPort(server.port,server.ipfd,&server.ipfd_count)

    ...

    // 将 acceptTcpHandler 注册为侦听套接字的事件处理器
    // 每当有连接 accept 时,acceptTcpHandler 就会被调用
    for (j = 0; j < server.ipfd_count; j++) {
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
    }
    }
  2. 建立连接
    当有客户端连接服务器时,acceptTcpHandler 会被调用(在事件循环 aeProcessEvents 中被调用),acceptTcpHandler 创建一个 client 保存相关信息,同时注册事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // networking.c

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    ...

    // cfd 是 accept 返回的 fd
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

    acceptCommonHandler(cfd,0,cip);
    }

    static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;

    // 创建一个 client 结构
    c = createClient(fd);
    ...
    }

    client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    ...

    // 将 readQueryFromClient 注册为事件处理器
    // 每当 fd 有读事件时,readQueryFromClient 就会被调用
    aeCreateFileEvent(server.el, fd, AE_READABLE, readQueryFromClient, c);
    }
  3. 处理命令
    当客户端有命令发送命令到 redis 时,有读事件产生,readQueryFromClient 会被调用(在事件循环 aeProcessEvents 中被调用),readQueryFromClient 找到对应的命令处理器并调用,然后返回结果。下面以 get 命令为例说明

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58

    // server.c

    // redisCommandTable 是命令表,表示命令对应的处理函数
    // get 命令的处理函数为 getCommand
    struct redisCommand redisCommandTable[] = {
    ...
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    ...
    }

    // 当客户端有命令发送时,事件循环 aeProcessEvents 法取得对应的连接的文件描述符,并调用对应的事件处理器,即 readQueryFromClient
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    ...

    // 读出命令
    nread = read(fd, c->querybuf+qblen, readlen);

    ...

    // 处理命令
    processInputBufferAndReplicate(c);
    }

    void processInputBufferAndReplicate(client *c) {
    ...
    processInputBuffer(c);
    ...
    }

    void processInputBuffer(client *c) {
    ...
    processCommand(c)
    ...
    }

    int processCommand(client *c) {
    ...

    // lookupCommand 从 server.commands 找出 c->argv[0]->ptr 所指的命令
    // 这时 c->cmd 指向的是 get 命令对应的 redisCommand 结构
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

    ...

    call(c,CMD_CALL_FULL);

    ...
    }

    void call(client *c, int flags) {
    ...
    // 调用 getCommand
    // getCommand 查找对应 key 的值,并将结果写到 clinet 的输出缓冲 c->buf 中
    // c->buf 的数据最终会通过 writeToClient (有多个地方调用) 写到 fd 中
    c->cmd->proc(c);
    ...
    }