redis 6.0 中默认是不启用多线程网络 IO,可以通过修改 redis.conf 的相关配置项打开,打开方法如下所示:
# So for instance if you have a four cores boxes, try to use 2 or 3 I/O# threads, if you have a 8 cores, try to use 6 threads. In order to# enable I/O threads use the following configuration directive:## io-threads 4## Setting io-threads to 1 will just use the main thread as usually.# When I/O threads are enabled, we only use threads for writes, that is# to thread the write(2) syscall and transfer the client buffers to the# socket. However it is also possible to enable threading of reads and# protocol parsing using the following configuration directive, by setting# it to yes:## io-threads-do-reads no#
将 io-threads 打开(去掉前面的 # )设置成你期望的线程数目,io-threads-do-reads 配置也要打开(去掉前面的 # ),其值改为 yes。
修改了这两个配置项后,我们使用 gdb 命令 set args "../redis.conf" 给 redis-server 设置参数,然后重启 redis-server。
(gdb) set args "../redis.conf"(gdb) rThe program being debugged has been started already.Start it from the beginning? (y or n) yStarting program: /root/redis-6.0.3/src/redis-server "../redis.conf"[Thread debugging using libthread_db enabled]Using host libthread_db library "/usr/lib64/libthread_db.so.1".
然后按 Ctrl + C 将程序中断下来,使用 info threads 命令查看此时的线程状况:
(gdb) info threads Id Target Id Frame * 1 Thread 0x7ffff7feb740 (LWP 11992) "redis-server" 0x00007ffff71e2603 in epoll_wait () from /usr/lib64/libc.so.6 2 Thread 0x7ffff0bb9700 (LWP 11993) "bio_close_file" 0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0 3 Thread 0x7ffff03b8700 (LWP 11994) "bio_aof_fsync" 0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0 4 Thread 0x7fffefbb7700 (LWP 11995) "bio_lazy_free" 0x00007ffff74bc965 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0 5 Thread 0x7fffef3b6700 (LWP 11996) "io_thd_1" 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0 6 Thread 0x7fffeebb5700 (LWP 11997) "io_thd_2" 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0 7 Thread 0x7fffee3b4700 (LWP 11998) "io_thd_3" 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0(gdb)
与未开启多线程网络 IO 的线程情况相比,多了线程名为 io_thd_1、io_thd_2、io_thd_3 线程,加上主线程一共四个 IO 线程(io-threads = 4),我们重点来看下这三个 IO 工作线程,这三个工作线程的逻辑一样,我们以 io_thd_1 为例。使用 thread 5 命令切换到 io_thd_1 线程,使用 bt 命令查看这个线程的调用堆栈:
(gdb) bt#0 0x00007ffff74bf4ed in __lll_lock_wait () from /usr/lib64/libpthread.so.0#1 0x00007ffff74badcb in _L_lock_883 () from /usr/lib64/libpthread.so.0#2 0x00007ffff74bac98 in pthread_mutex_lock () from /usr/lib64/libpthread.so.0#3 0x0000000000447907 in IOThreadMain (myid=0x1) at networking.c:2921#4 0x00007ffff74b8dd5 in start_thread () from /usr/lib64/libpthread.so.0#5 0x00007ffff71e202d in clone () from /usr/lib64/libc.so.6
堆栈 #3 处的代码如下:
//networking.c 2903行void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */ long id = (unsigned long)myid; char thdname[16]; snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist); while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (io_threads_pending[id] != 0) break; } /* Give the main thread a chance to stop this thread. */ if (io_threads_pending[id] == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } serverAssert(io_threads_pending[id] != 0); if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0; if (tio_debug) printf("[%ld] Done\n", id); }}
IOThreadMain 函数是工作线程函数,主要逻辑是一些初始化工作和一个主要的 while 循环,初始化工作主要逻辑是设置线程的名称:
//networking.c 2906行long id = (unsigned long)myid;char thdname[16];snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);redis_set_thread_title(thdname);
这就是在 gdb 中看到线程名为 io_thd_1、io_thd_2、io_thd_3 的原因。工作线程 id 是主线程创建线程时通过线程参数传递过来的,从 1 开始,0 号 IO 线程是主线程。主线程在 main 函数中调用 InitServerLast 函数,InitServerLast 函数中调用 initThreadedIO 函数,initThreadedIO 函数中根据配置文件中的线程数量创建对应数量的 IO 工作线程数量。我们可以给 initThreadedIO 函数加个断点,然后重启 gdb,就可以看到对应的调用关系和相应的代码位置:
Thread 1 "redis-server" hit Breakpoint 2, initThreadedIO () at networking.c:29542954 io_threads_active = 0; /* We start with threads not active. */(gdb) bt#0 initThreadedIO () at networking.c:2954#1 0x0000000000431aa8 in InitServerLast () at server.c:2954#2 0x0000000000437195 in main (argc=2, argv=0x7fffffffe308) at server.c:5142(gdb)
initThreadedIO 函数定义如下:
//networking.c 2953行void initThreadedIO(void) { io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } /* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); //编号为 0 时是主线程 if (i == 0) continue; /* Thread 0 is the main thread. */ /* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); io_threads_pending[i] = 0; pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; }}
通过上述代码段,我们可以得到两个结论:
- redis 最大允许 IO 工作线程数目为 128个(IO_THREADS_MAX_NUM 宏);
- 序号为 0 的线程是主线程,因此实际的工作线程数目是 io-threads - 1。
创建新的 IO 线程之前,为每个线程创建一个存储代表客户端的 client 对象链表 io_threads_list[i],它们在存储在全局数组对象 io_threads_list 中,与线程序号一一对应;同时创建相应数量的整型变量(unsigned long)存储于另外一个全局数组 io_threads_pending 中,同样与线程序号一一对应,这些整型变量和 另外一组 Linux 互斥体对象(存储在 io_threads_mutex 数组中)一起让主线程可以控制工作线程的启动与停止,控制逻辑如下:
将 io_threads_pending[i] 设置为 0;
在上述循环中,初始化 io_threads_mutex[i] 对象后,立刻调用 pthread_mutex_lock(&io_threads_mutex[i]) 将这些互斥体锁定;
接着开始创建对应的 IO 工作线程,在 IO 工作线程函数 IOThreadMain 中有如下代码:
//networking.c 2903行void *IOThreadMain(void *myid) { //...省略部分代码... while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (io_threads_pending[id] != 0) break; } /* Give the main thread a chance to stop this thread. */ if (io_threads_pending[id] == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } //...省略部分代码... /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0; }}
工作线程执行上述代码 pthread_mutex_lock(&io_threads_mutex[id]) 行时,由于 io_threads_mutex[id] 这个互斥体已经被主线程加锁了,因此工作线程阻塞在这里。如果想启用这些 IO 工作线程,可以调用 startThreadedIO 函数,startThreadedIO 函数实现如下:
//networking.c 2985行void startThreadedIO(void) { //...省略部分代码... for (int j = 1; j < server.io_threads_num; j++) pthread_mutex_unlock(&io_threads_mutex[j]); io_threads_active = 1;}
startThreadedIO 对相应的互斥体 io_threads_mutex[id] 进行解锁,同时设置启用 IO 线程的标志变量 io_threads_active,这个变量将在下文介绍。有读者可能会注意到:即使解锁 io_threads_mutex[id] 互斥体后,continue 之后,下一轮循环由于 io_threads_pending[id] 仍然为 0,循环会继续加锁解锁再 continue,仍然不能执行 IOThreadMain 处理由 client 对象组成的链表对象。确实如此,因此除了解锁 io_threads_mutex[id] 互斥体还必须将 io_threads_pending[id] 设置为非 0 值,才能执行 IO 工作线程的主要逻辑。那么 io_threads_pending[id] 在什么地方被设置成非 0 值呢?
在 beforeSleep 函数中分别调用了 handleClientsWithPendingReadsUsingThreads 和 handleClientsWithPendingWritesUsingThreads() ,这两个函数分别对应读和写的情况。
//server.c 2106行void beforeSleep(struct aeEventLoop *eventLoop) { //...省略部分代码... /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); //...省略部分代码... /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); //...省略部分代码...}
先来看读的情况,handleClientsWithPendingReadsUsingThreads 函数定义如下:
//networking 3126行int handleClientsWithPendingReadsUsingThreads(void) { if (!io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); if (processed == 0) return 0; if (tio_debug) printf("%d TOTAL READ pending clients\n", processed); /* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; //主线程给工作线程分配client对象的策略 while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_READ; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } /* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]); /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } if (tio_debug) printf("I/O READ All threads finshed\n"); /* Run the list of clients again to process the new buffers. */ while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); if (c->flags & CLIENT_PENDING_COMMAND) { c->flags &= ~CLIENT_PENDING_COMMAND; if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */ continue; } } processInputBuffer(c); } return processed;}
上述代码先通过 io_threads_active 和 server.io_threads_do_reads 两个标志判断是否开启了 IO 线程,如果没开启则直接退出该函数,所有的 IO 操作在主线程中处理。如果开启了 IO 线程,第一个 while 循环处是主线程给 IO 线程分配 client 对象的策略,这里的策略也很简单,即所谓的 Round-Robin(轮询策略),根据当前处理序号与线程数量求余,分别将对应的 client 对象放入相应的线程(包括主线程)存储 client 的链表中。假设现在包括主线程一共有 4 个 IO 线程,则第 0 个 client 对象分配给主线程,第 1 个分配给 1 号工作线程,第 2 个分配 2 号工作线程,第 3 个 分配给 3 号线程,第 4 个再次分配给主线程,第 5 个分配给 1 号线程,第 6 个分配给 2 号线程......以此类推。
分配好 client 对象到相应的 IO 线程的链表中后,设置与这些工作线程相对应的 io_threads_pending[j] 变量值为非 0 值,这里实际设置的值是对应的工作线程的链表的长度,因为在 client 对象少于 IO 线程数量的情况下,某些IO 线程的链表长度为 0,此时就没必要唤醒该工作线程。
//networking.c 3147行io_threads_op = IO_THREADS_OP_READ;for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count;}
主线程给 IO 工作线程分配好相应的 client 对象、并设置唤醒标志(io_threads_pending[j])后,由于主线程自己也参与了分配,因此接下来需要处理自己被分配到的 client 对象,然后开始遍历自己的链表挨个处理:
//networking.c 3153行/* Also use the main thread to process a slice of clients. */listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln); readQueryFromClient(c->conn);}listEmpty(io_threads_list[0]);
上述代码,主线程从自己的链表(io_threads_list[0])中挨个取出各个 client 对象,然后调用 readQueryFromClient 读取数据和解包,这个流程在上文已经介绍过了。处理完毕后,将自己的链表清空。
同样的道理,IO 工作线程在处理自己的链表时也是一样的操作:
//networking.c 2903行void *IOThreadMain(void *myid) { //...省略部分代码... while(1) { //...省略部分代码... /* Give the main thread a chance to stop this thread. */ if (io_threads_pending[id] == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } //...省略部分代码... listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } //处理完成后将自己的清空自己的链表 listEmpty(io_threads_list[id]); //重置状态标志值 io_threads_pending[id] = 0; //...省略部分代码... }}
IO 线程在处理完自己链表的 client 对象后也会清空自己的链表并重置 io_threads_pending[id] 标志。而此时主线程的利用一个无限循环等待 IO 工作线程将自己链表中的 client 处理完毕:
//networking.c 3126行int handleClientsWithPendingReadsUsingThreads(void) { //...省略部分代码... /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } //...省略部分代码...}
由于每个 IO 工作线程在处理完自己的链表中的 client 对象后,会将自己的 io_threads_pending[id] 重置为 0,这样最终主线程的 for 循环的 pending 值会变为 0,退出这个 while 无限循环。
以上就是 redis 6.0 之后如何利用 IO 工作线程对读事件的处理。但是如果读者仔细研究源码会发现两个问题:
(本文节选自『小方说服务器开发』知识星球《Redis 6.0 源码解析》专栏,由于公众号字数限制,本文节选部分章节)
目前小方说服务器开发有 12 个专栏,共计 114 个后端开发进阶系列。

另外举行不定期的技术直播并为球友提供高清录像,目前星球提供以下录像:
《redis 源码解析视频教程》及课件
《网络编程重难点解析》及课件
《多线程编程的艺术》及课件
《Flamingo IM 视频教程》及课件
更多录像参见星球精华帖子。
小方说服务器开发知识星球为球友提供如下服务:
1. 优问优答
2. 不定期的技术直播和录像
3. 优质源码分享和指导
4. 职业解惑和简历review
5. 十二大后端开发进阶专栏教程。
目前是 325元/年,一天一块钱不到,有兴趣的读者可扫码加入。
