本文介绍: Redis server 一旦和一个客户端建立连接后,就会在事件驱动框架注册可读事件,这就对应客户端命令请求。而对于整个命令处理过程来说,我认为主要可以分成四个阶段

redis原子性保证

Redis server 一旦和一个客户端建立连接后,就会在事件驱动框架注册可读事件,这就对应客户端命令请求。而对于整个命令处理过程来说,我认为主要可以分成四个阶段:

这四个阶段在 Redis 6.0 版本前都是由主 IO 线程执行完成的。虽然 Redis 使用了 IO 多路复用机制,但是该机制只是一次获取多个就绪socket 描述符对应多个发送命令请求客户端。而 Redis 在主 IO 线程中,还是逐一来处理每个客户端上的命令的,所以命令执行原子性依然可以得到保证。

而当使用了 Redis 6.0 版本后,命令处理过程中的读取解析结果写回,就由多个 IO 线程处理了。不过你也不用担心,多个 IO 线程只是完成解析第一个读到的命令,命令的实际执行还是由主 IO 线程处理。当多个 IO 线程并发写回结果时,命令就已经执行完了,不存在多 IO 线程冲突问题。所以,使用了多 IO 线程后,命令执行的原子性仍然可以得到保证。

为什么并发IO线程读写还能保证处理的原子性?

答:主线负责read pending队列中的数据放入到这些IO线程的io_threads_list队列,并且处理io_threads_list[0]也就是主线处理IO操作处理完成之后,主线程自旋等待IO线程处理完之后,才开始一个个执行命令,所以保证了原子性。
源码

void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    // 处理read pending队列客户端队列
    handleClientsWithPendingReadsUsingThreads();
    ...
}

int handleClientsWithPendingReadsUsingThreads(void) {
    // 获取clients_pending_read队列列表迭代
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    
    // 一,放入不同的IO线程中
    // 遍历所有待读取的客户端,并将其散列到不同IO线程处理列表
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 通过取余方式散列获取IO线程下标
        int target_id = item_id % server.io_threads_num;
        // 将该客户端放入该下标列表
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    // 所有连接放入到IO线程处理列表后将IO线程操作标识为IO_THREADS_OP_READ读操作
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        // 设置io_threads_pending为非零数,也即当前需要处理的客户端数量,这时线程将会响应操作,开始处理客户端连接
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }
    
    // 二、处理主线程的IO读写时间
    //io_threads_list数组0下标处为main线程处理,也即main线程处理一部分读IO
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    // 清空主线程负责下标为0的客户端列表,其他的下标由IO线程自己处理
    listEmpty(io_threads_list[0]);
    
    // 三、自旋等待所有IO线程全部处理完
    // 自旋等嗲其他线程处理IO完毕
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    
    // 四、执行命令
    // 当所有IO线程将clients_pending_read的客户端读IO处理完毕后,在主线程中处理客户端命令
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        // 去掉CLIENT_PENDING_READ标志位,并将其从clients_pending_read队列移除
        c->flags &amp;= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        // 如果设置暂停客户端请求那么继续循环
        if (clientsArePaused()) continue;
        // 处理客户端命令
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            continue;
        }
        processInputBuffer(c);
        // 如果处理完毕且有数据需要写回,那么将客户端放入clients_pending_write队列等待IO线程完成写操作
        if (!(c->flags &amp; CLIENT_PENDING_WRITE) &amp;&amp; clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }
    server.stat_io_reads_processed += processed;
    return processed;
}

我们可以简单地用一段话来描述Redis的请求处理流程:Redis主线程一次性获取最大为1000个客户端连接,将其放入到read pending队列中,在下一次aeMain主循环调用beforeSleep函数,该函数将read pending队列和write pending队列中的客户端散列到IO线程中执行读写操作,并且自身负责下标为0处的客户端,然后等待IO线程执行 read、write 完毕后再执行。

原文地址:https://blog.csdn.net/u013277209/article/details/128924891

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_25124.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注