redis 自己实现了一个事件库,用于处理网络事件和定时器事件,驱动 redis 主线程展开工作。本文通过源码分析 redis 的事件循环如何工作以及如何处理命令。
数据结构
1 | typedef struct aeEventLoop { |
aeEventLoop 定义了事件循环的基本数据结构。aeEventLoop 有两个主要成员 events 和 fired,前进保存已经注册的事件,后者保存触发的事件。
1 | typedef struct aeFileEvent { |
aeFileEvent 定义了文件(网络)事件处理的函数 rfileProc,wfileProc,函数处理器的原型是
1 | typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); |
eventLoop 是事件循环自已, fd 是触发事件的文件描述符
1 |
|
aeFiredEvent 定义已触发的事件,包含了触发事件的文件描述符和掩码(表示读还是写)
1 | typedef struct aeTimeEvent { |
aeTimeEvent 定义了定时器事件,主要成员是 when_sec, when_ms 等表示什么时候执行, timeProc 表示要执行的操作
基本用法
事件循环初始化
1
aeEventLoop *aeCreateEventLoop(int setsize)
函数创建一个 aeEventLoop 结构,初始化各个成员,并根据传入的 setsize 参数给 events 和 fired 两个数组分配空间
注册事件
1
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
函数向 eventLoop 注册一个事件,它修改 eventLoop->events[fd] 的成员实现事件的注册
此外还会调用 aeApiAddEvent 方法注册 fd 到选择器(select 或 epoll)注册定时器事件
1
2
3long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)函数向 eventLoop 注册一个定时器事件,它创建一个 aeTimeEvent 结构,计算 when_sec 和 when_ms (从当前时间加 milliseconds)
然后插到 eventLoop->timeEventHead 的头部事件循环处理
1
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
函数实现事件循环的处理,它通过选择器获取网络事件,然后交给各个事件处理器去处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
...
// aeApiPoll 会将 eventLoop->fired 的前 numevents 个元素的 fd 成员设置为触发网络事件的网络套接字
numevents = aeApiPoll(eventLoop, tvp);
...
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
// 一般情况下,先读后写,但也有些情况做相反的操作
int invert = fe->mask & AE_BARRIER;
// 处理读事件
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
// 处理写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 处理反向的读
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
}
...
// 处理定时器事件,遍历 eventLoop->timeEventHead,找到适合的节点然后调用节点的 timeProc
processTimeEvents(eventLoop);
}事件循环
void aeMain(aeEventLoop *eventLoop)
函数 aeMain 实现事件循环, redis 主线程在完成初始化后,即调用 aeMain 进入事件循环1
2
3
4
5
6
7
8
9
10void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 调用 beforeSleep 函数
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
redis 命令执行过程
redis 只有一个主线程处理大部分的事件(如来自客户端的命令),主线程在完成初始化后就就进入事件循环
网络侦听
redis 初始化时需要绑定并侦听端口,并在事件循环中注册相应的事件处理器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// server.c
void initServer(void) {
...
// 创建并初始化 aeEventLoop 数据结构
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
// ipfd 是个数组,所有侦听返回的 fd 都保存在这个数组,数组长度是 server.ipfd_count
listenToPort(server.port,server.ipfd,&server.ipfd_count)
...
// 将 acceptTcpHandler 注册为侦听套接字的事件处理器
// 每当有连接 accept 时,acceptTcpHandler 就会被调用
for (j = 0; j < server.ipfd_count; j++) {
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
}
}建立连接
当有客户端连接服务器时,acceptTcpHandler 会被调用(在事件循环 aeProcessEvents 中被调用),acceptTcpHandler 创建一个 client 保存相关信息,同时注册事件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
...
// cfd 是 accept 返回的 fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(cfd,0,cip);
}
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 创建一个 client 结构
c = createClient(fd);
...
}
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
...
// 将 readQueryFromClient 注册为事件处理器
// 每当 fd 有读事件时,readQueryFromClient 就会被调用
aeCreateFileEvent(server.el, fd, AE_READABLE, readQueryFromClient, c);
}处理命令
当客户端有命令发送命令到 redis 时,有读事件产生,readQueryFromClient 会被调用(在事件循环 aeProcessEvents 中被调用),readQueryFromClient 找到对应的命令处理器并调用,然后返回结果。下面以 get 命令为例说明1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// server.c
// redisCommandTable 是命令表,表示命令对应的处理函数
// get 命令的处理函数为 getCommand
struct redisCommand redisCommandTable[] = {
...
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
...
}
// 当客户端有命令发送时,事件循环 aeProcessEvents 法取得对应的连接的文件描述符,并调用对应的事件处理器,即 readQueryFromClient
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
...
// 读出命令
nread = read(fd, c->querybuf+qblen, readlen);
...
// 处理命令
processInputBufferAndReplicate(c);
}
void processInputBufferAndReplicate(client *c) {
...
processInputBuffer(c);
...
}
void processInputBuffer(client *c) {
...
processCommand(c)
...
}
int processCommand(client *c) {
...
// lookupCommand 从 server.commands 找出 c->argv[0]->ptr 所指的命令
// 这时 c->cmd 指向的是 get 命令对应的 redisCommand 结构
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
...
call(c,CMD_CALL_FULL);
...
}
void call(client *c, int flags) {
...
// 调用 getCommand
// getCommand 查找对应 key 的值,并将结果写到 clinet 的输出缓冲 c->buf 中
// c->buf 的数据最终会通过 writeToClient (有多个地方调用) 写到 fd 中
c->cmd->proc(c);
...
}