High performance and linear scalability up to 1000 nodes. There are no proxies, asynchronous replication is used, and no merge operations are performed on values.
高达1000个节点的线性可扩展性。没有代理,使用异步复制,并且在进行赋值时没有合并操作
Acceptable degree of write safety: the system tries (in a best-effort way) to retain all the writes originating from clients connected with the majority of the master nodes. Usually there are small windows where acknowledged writes can be lost. Windows to lose acknowledged writes are larger when clients are in a minority partition.
Availability: Redis Cluster is able to survive partitions where the majority of the master nodes are reachable and there is at least one reachable slave for every master node that is no longer reachable. Moreover using replicas migration, masters no longer replicated by any slave will receive one from a master which is covered by multiple slaves.
// 初始化集群 voidclusterInit(void){ int saveconf = 0; // 初始化配置 server.cluster = zmalloc(sizeof(clusterState)); server.cluster->myself = NULL; server.cluster->currentEpoch = 0; // 初始为 FAIL 状态 server.cluster->state = CLUSTER_FAIL; // master 节点数 server.cluster->size = 1; server.cluster->todo_before_sleep = 0; server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; server.cluster->failover_auth_rank = 0; server.cluster->failover_auth_epoch = 0; server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; server.cluster->lastVoteEpoch = 0; server.cluster->stats_bus_messages_sent = 0; server.cluster->stats_bus_messages_received = 0; memset(server.cluster->slots,0, sizeof(server.cluster->slots)); clusterCloseAllSlots(); /* Lock the cluster config file to make sure every node uses * its own nodes.conf. */ // 配置文件上锁 if (clusterLockConfig(server.cluster_configfile) == C_ERR) exit(1); /* Load or create a new nodes configuration. */ // 载入或创建一个新的节点配置文件 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) { /* No configuration found. We will just use the random name provided * by the createClusterNode() function. */ // 没找到配置文件,随机创建一个集群节点 myself = server.cluster->myself = createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER); serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s", myself->name); // 添加到当前集群节点的配置中(// 该 nodes 维护的是一张 nodeName -> node 的 hash 表) clusterAddNode(myself); saveconf = 1; } // 写配置文件 if (saveconf) clusterSaveConfigOrDie(1); /* We need a listening TCP port for our cluster messaging needs. */ server.cfd_count = 0; /* Port sanity check II * The other handshake port check is triggered too late to stop * us from trying to use a too-high cluster port number. */ // 检查端口号是否合法(<=65535) if (server.port > (65535-CLUSTER_PORT_INCR)) { serverLog(LL_WARNING, "Redis port number too high. " "Cluster communication port is 10,000 port " "numbers higher than your Redis port. " "Your Redis port number must be " "lower than 55535."); exit(1); } // 将该集群节点的端口和fd绑定( 打开 cluster 通道的 非阻塞监听端口) if (listenToPort(server.port+CLUSTER_PORT_INCR, server.cfd,&server.cfd_count) == C_ERR) { exit(1); } else { int j; for (j = 0; j < server.cfd_count; j++) { // 为所有集群的fd设置可读事件的处理函数clusterAcceptHandler // 可以根据系统平台选择合适的 事件模型(如:Linux 上的 epoll,具体查看 aeApiAddEvent 源码) if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, clusterAcceptHandler, NULL) == AE_ERR) serverPanic("Unrecoverable error creating Redis Cluster " "file event."); } } /* The slots -> keys map is a sorted set. Init it. */ // 创建槽映射到键的有序集合 server.cluster->slots_to_keys = zslCreate(); /* Set myself->port to my listening port, we'll just need to discover * the IP address via MEET messages. */ // 设置集群端口 myself->port = server.port; // 没有正在进行手动的故障转移 server.cluster->mf_end = 0; // 重置与手动故障转移的状态 resetManualFailover(); }
从该部分源码可以看出 Redis Cluster 初始化部分的核心主要是 nodes.conf 的加载,以及 cluster bus 通道的监听服务的启动 这两部分.
typedefstruct clusterNode { // 创建节点的时间 mstime_t ctime; /* Node object creation time. */ // 节点的名字,由 40 个十六进制字符组成 char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ // 节点标识 // 使用各种不同的标识值记录节点的角色(比如主节点或者从节点), // 以及节点目前所处的状态(比如在线或者下线)。 int flags; /* CLUSTER_NODE_... */ // 节点当前的配置纪元,用于实现故障转移 uint64_t configEpoch; /* Last configEpoch observed for this node */ // 由这个节点负责处理的槽 //(一共有 REDIS_CLUSTER_SLOTS / 8 个字节长,每个字节的每个位记录了一个槽的保存状态位,值为 1 表示槽正由本节点处理,值为 0 则表示槽并非本节点处理) unsignedchar slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ // 该节点负责处理的槽数量 int numslots; /* Number of slots handled by this node */ // 如果本节点是主节点,那么用这个属性记录从节点的数量 int numslaves; /* Number of slave nodes, if this is a master */ // 指针数组,指向各个从节点 struct clusterNode **slaves; /* pointers to slave nodes */ // 指向主节点,即使是从节点也可以为NULL struct clusterNode *slaveof; /* pointer to the master node. Note that it may be NULL even if the node is a slave if we don't have the master node in our tables. */ // 最后一次发送 PING 命令的时间 mstime_t ping_sent; /* Unix time we sent latest ping */ // 接收到PONG的时间 mstime_t pong_received; /* Unix time we received the pong */ // 最后一次被设置为 FAIL 状态的时间 mstime_t fail_time; /* Unix time when FAIL flag was set */ // 最后一次给某个从节点投票的时间 mstime_t voted_time; /* Last time we voted for a slave of this master */ // 最后一次从这个节点接收到复制偏移量的时间 mstime_t repl_offset_time; /* Unix time we received offset for this node */ // 孤立的主节点迁移的时间 mstime_t orphaned_time; /* Starting time of orphaned master condition */ // 该节点已知的复制偏移量 longlong repl_offset; /* Last known repl offset for this node. */ // 节点的 IP 地址 char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ // 节点的端口号 int port; /* Latest known port of this node */ // 保存连接节点所需的有关信息 clusterLink *link; /* TCP/IP link with this node */ // 保存下线报告的链表 list *fail_reports; /* List of nodes signaling this as failing */ } clusterNode;
typedefstructclusterState { // 指向当前节点的指针 clusterNode *myself; /* This node */ // 集群当前的配置纪元,用于实现故障转移 uint64_t currentEpoch; // 集群当前的状态:是在线还是下线 int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ // 集群中至少负责一个槽的主节点个数 int size; /* Num of master nodes with at least one slot */ // 集群节点名单(包括 myself 节点) // 字典的键为节点的名字,字典的值为 clusterNode 结构 dict *nodes; /* Hash table of name -> clusterNode structures */ // 防止重复添加节点的黑名单 dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ // 导出槽数据到目标节点,该数组记录这些节点 clusterNode *migrating_slots_to[CLUSTER_SLOTS]; // 导入槽数据到目标节点,该数组记录这些节点 clusterNode *importing_slots_from[CLUSTER_SLOTS]; // 槽和负责槽节点的映射(负责处理各个槽的节点) // 例如 slots[i] = clusterNode_A 表示槽 i 由节点 A 处理 clusterNode *slots[CLUSTER_SLOTS]; //槽映射到键的有序集合( 跳表中以槽作为分值,键作为成员,对槽进行有序排序,当有需要对槽进行区间range操作时,会很方便 ) zskiplist *slots_to_keys; /* The following fields are used to take the slave state on elections. */ // 以下这些域被用于进行故障转移选举 // 上次执行选举或者下次执行选举的时间 mstime_t failover_auth_time; /* Time of previous or next election. */ // 节点获得的投票数量 int failover_auth_count; /* Number of votes received so far. */ // 如果为真,表示本节点已经向其他节点发送了投票请求 int failover_auth_sent; /* True if we already asked for votes. */ // 该从节点在当前请求中的排名 int failover_auth_rank; /* This slave rank for current auth request. */ // 当前选举的纪元 uint64_t failover_auth_epoch; /* Epoch of the current election. */ // 从节点不能执行故障转移的原因 int cant_failover_reason; /* Why a slave is currently not able to failover. See the CANT_FAILOVER_* macros. */ /* Manual failover state in common. */ /* 共用的手动故障转移状态 */ // 如果为0,表示没有正在进行手动的故障转移。否则表示手动故障转移的时间限制 mstime_t mf_end; /* Manual failover time limit (ms unixtime). It is zero if there is no MF in progress. */ /* Manual failover state of master. */ /* 主服务器的手动故障转移状态 */ clusterNode *mf_slave; /* Slave performing the manual failover. */ /* Manual failover state of slave. */ // 从节点记录手动故障转移时的主节点偏移量 longlong mf_master_offset; /* Master offset the slave needs to start MF or zero if stil not received. */ // 非零值表示手动故障转移能开始 int mf_can_start; /* If non-zero signal that the manual failover can start requesting masters vote. */ /* The followign fields are used by masters to take state on elections. */ /* 以下这些域由主服务器使用,用于记录选举时的状态 */ // 集群最近一次投票的纪元 uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ // 调用clusterBeforeSleep()所做的一些事,以各个 flag 来记录 int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ // 通过 cluster 连接发送的消息数量 longlong stats_bus_messages_sent; /* Num of msg sent via cluster bus. */ // 通过 cluster 接收到的消息数量 longlong stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/ } clusterState;
/* Start an handshake with the specified address if there is not one * already in progress. Returns non-zero if the handshake was actually * started. On error zero is returned and errno is set to one of the * following values: * 如果还没有与指定的地址进行过握手,那么进行握手。 * 返回 1 表示握手已经开始, * 返回 0 并将 errno 设置为以下值来表示意外情况: * EAGAIN - There is already an handshake in progress for this address. 已经有握手在进行中了。 * EINVAL - IP or port are not valid. ip 或者 port 参数不合法。 */ intclusterStartHandshake(char *ip, int port){ clusterNode *n; char norm_ip[NET_IP_STR_LEN]; structsockaddr_storage sa; /* IP sanity check */ // 检查地址是否非法 if (inet_pton(AF_INET,ip, &(((struct sockaddr_in *)&sa)->sin_addr))) { sa.ss_family = AF_INET; } elseif (inet_pton(AF_INET6,ip, &(((struct sockaddr_in6 *)&sa)->sin6_addr))) { sa.ss_family = AF_INET6; } else { errno = EINVAL; return0; } /* Port sanity check */ // port 合法性检查 if (port <= 0 || port > (65535-CLUSTER_PORT_INCR)) { errno = EINVAL; return0; } /* Set norm_ip as the normalized string representation of the node * IP address. */ // 设置 norm_ip 作为节点地址的标准字符串表示形式 memset(norm_ip,0,NET_IP_STR_LEN); if (sa.ss_family == AF_INET) inet_ntop(AF_INET, (void*)&(((struct sockaddr_in *)&sa)->sin_addr), norm_ip,NET_IP_STR_LEN); else inet_ntop(AF_INET6, (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr), norm_ip,NET_IP_STR_LEN); // 判断当前地址是否处于握手状态,如果是,则设置errno并返回,该函数被用来避免重复和相同地址的节点进行握手 if (clusterHandshakeInProgress(norm_ip,port)) { errno = EAGAIN; return0; } /* Add the node with a random address (NULL as first argument to * createClusterNode()). Everything will be fixed during the * handshake. */ // 为node设置一个随机的名字,当握手完成时会为其设置真正的名字 // 创建一个随机名字的节点 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET); // 设置地址 memcpy(n->ip,norm_ip,sizeof(n->ip)); n->port = port; // 添加到集群中 clusterAddNode(n); return1; }
/* This is executed 10 times every second */ // 集群的周期性执行函数。每秒执行10次,100ms执行一次 voidclusterCron(void){ dictIterator *di; dictEntry *de; int update_state = 0; // 没有从节点从属的主节点个数 int orphaned_masters; /* How many masters there are without ok slaves. */ // 所有主节点附属的最多的从节点数量 int max_slaves; /* Max number of ok slaves for a single master. */ // 如果myself是从节点,该从节点对应的主节点下有多少从节点 int this_slaves; /* Number of ok slaves for our master (if we are slave). */ mstime_t min_pong = 0, now = mstime(); clusterNode *min_pong_node = NULL; // 静态变量,表示该函数执行的计数器 staticunsignedlonglong iteration = 0; mstime_t handshake_timeout; // 记录一次迭代 iteration++; /* Number of times this function was called so far. */ /* The handshake timeout is the time after which a handshake node that was * not turned into a normal node is removed from the nodes. Usually it is * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use * the value of 1 second. */ // 获取握手状态超时的时间,最低为1s // 如果一个处于握手状态的节点如果没有在该超时时限内变成一个普通的节点,那么该节点从节点字典中被删除 handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000; /* Check if we have disconnected nodes and re-establish the connection. */ // 检查是否当前集群中有断开连接的节点和重新建立连接的节点 di = dictGetSafeIterator(server.cluster->nodes); // 遍历所有集群中的节点,如果有未建立连接的节点,那么发送PING或PONG消息,建立连接 while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); // 跳过当前节点以及没有地址的节点 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; /* A Node in HANDSHAKE state has a limited lifespan equal to the * configured node timeout. */ // 如果仍然node节点处于握手状态,但是从建立连接开始到现在已经超时 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { // 从集群中删除该节点,遍历下一个节点 clusterDelNode(node); continue; } // 为未创建连接的节点创建连接 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link; // myself节点连接这个node节点 fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR); // 连接出错,跳过该节点 if (fd == -1) { /* We got a synchronous error from connect before * clusterSendPing() had a chance to be called. * If node->ping_sent is zero, failure detection can't work, * so we claim we actually sent a ping now (that will * be really sent as soon as the link is obtained). */ // 如果ping_sent为0,察觉故障无法执行,因此要设置发送PING的时间,当建立连接后会真正的的发送PING命令 if (node->ping_sent == 0) node->ping_sent = mstime(); serverLog(LL_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+CLUSTER_PORT_INCR, server.neterr); continue; } // 为node节点创建一个连接对象 link = createClusterLink(node); // 设置连接对象的属性 link->fd = fd; // 为node设置连接对象 node->link = link; // 监听该连接的可读事件,设置可读时间的读处理函数 aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. * * If the node is flagged as MEET, we send a MEET message instead * of a PING one, to force the receiver to add us in its node * table. */ /* 向新连接的节点发送 PING 命令,防止节点被识进入下线*/ // 备份旧的发送PING的时间 old_ping_sent = node->ping_sent; // 如果node节点指定了MEET标识,那么发送MEET命令,否则发送PING命令 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); // 如果不是第一次发送PING命令,要将发送PING的时间还原,等待被clusterSendPing()更新 if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; } /* We can clear the flag after the first packet is sent. * If we'll never receive a PONG, we'll never send new packets * to this node. Instead after the PONG is received and we * are no longer in meet/handshake status, we want to send * normal PING packets. */ // 发送MEET消息后,清除MEET标识 // 如果没有接收到PONG回复,那么不会在向该节点发送消息 // 如果接收到了PONG回复,取消MEET/HANDSHAKE状态,发送一个正常的PING消息 node->flags &= ~CLUSTER_NODE_MEET; serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+CLUSTER_PORT_INCR); } } dictReleaseIterator(di); ... }
// 监听事件处理器(集群的fd所设置可读事件的处理函数) #define MAX_CLUSTER_ACCEPTS_PER_CALL 1000 voidclusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask){ int cport, cfd; int max = MAX_CLUSTER_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; clusterLink *link; UNUSED(el); UNUSED(mask); UNUSED(privdata); /* If the server is starting up, don't accept cluster connections: * UPDATE messages may interact with the database content. */ // 如果当前节点正在载入数据,则直接返回。不接收集群的连接 if (server.masterhost == NULL && server.loading) return; // 最大每次调用接收1000个连接 while(max--) { // TCP连接的accept cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_VERBOSE, "Error accepting cluster node: %s", server.neterr); return; } // 设置fd为非阻塞模式 anetNonBlock(NULL,cfd); // 禁用 nagle 算法 anetEnableTcpNoDelay(NULL,cfd); /* Use non-blocking I/O for cluster messages. */ serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport); /* Create a link object we use to handle the connection. * It gets passed to the readable handler when data is available. * Initiallly the link->node pointer is set to NULL as we don't know * which node is, but the right node is references once we know the * node identity. */ // 当连接成功后,为其创建一个连接对象,但是不关联连接的节点 link = createClusterLink(NULL); link->fd = cfd; // 监听该连接的可读事件,并设置处理函数为clusterReadHandler aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link); } }
clusterAcceptHandler()函数,该函数实际上就是accept()函数,接收A节点的连接,然后监听该连接上的可读事件,设置可读事件的处理函数为clusterReadHandler(),等待A节点发送数据,当A节点发送MEET消息给目标节点时,触发目标节点执行clusterReadHandler()函数来处理消息。clusterReadHandler函数主要工作就是解析 cluster bus 上接收的数据并进行消息分包,然后对消息进行处理,而对于消息的分包首先需要了解一下消息结构,Redis Cluster 节点之间通信的消息结构定义如下:
#define CLUSTER_PROTO_VER 0 /* Cluster bus protocol version. */ // 用来表示集群消息的结构(消息头,header) typedefstruct { // "RCmb"的签名 char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */ // 消息的总长 uint32_t totlen; /* Total length of this message */ // 协议版本,当前为0 uint16_t ver; /* Protocol version, currently set to 0. */ // 未使用的2字节 uint16_t notused0; /* 2 bytes not used. */ // 消息类型 uint16_t type; /* Message type */ // 只在发送PING、PONG和MEET消息时使用 // 消息正文包含的节点信息数量 uint16_t count; /* Only used for some kind of messages. */ // 消息发送者的配置纪元 uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ // 如果消息发送者是一个主节点,那么这里记录的是消息发送者的配置纪元 // 如果消息发送者是一个从节点,那么这里记录的是消息发送者正在复制的主节点的配置纪元 uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch advertised by its master if it is a slave. */ // 节点的复制偏移量 uint64_t offset; /* Master replication offset if node is a master or processed replication offset if node is a slave. */ // 发送消息的节点的name(ID) char sender[CLUSTER_NAMELEN]; /* Name of the sender node */ // 消息发送者目前的槽指派信息 unsignedchar myslots[CLUSTER_SLOTS/8]; // 如果消息发送者是一个从节点,那么这里记录的是消息发送者正在复制的主节点的名字 // 如果消息发送者是一个主节点,那么这里记录的是空( REDIS_NODE_NULL_NAME 一个 40 字节长,值全为 0 的字节数组) char slaveof[CLUSTER_NAMELEN]; // 32字节未使用 char notused1[32]; /* 32 bytes reserved for future usage. */ // 发送消息节点的端口 uint16_t port; /* Sender TCP base port */ // 发送消息节点的标识 uint16_t flags; /* Sender node flags */ // 消息发送者所处集群的状态 unsignedchar state; /* Cluster state from the POV of the sender */ // 消息的标识 unsignedchar mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */ // 消息的数据 unionclusterMsgData data; } clusterMsg;
/* Read data. Try to read the first field of the header first to check the * full length of the packet. When a whole packet is in memory this function * will call the function to process the packet. And so forth. */ // 读事件处理器 // 首先读入内容的头,以判断读入内容的长度 // 如果内容是一个 whole packet ,那么调用函数来处理这个 packet 。 voidclusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask){ char buf[sizeof(clusterMsg)]; ssize_t nread; clusterMsg *hdr; clusterLink *link = (clusterLink*) privdata; unsignedint readlen, rcvbuflen; UNUSED(el); UNUSED(mask); // 循环从fd读取数据(尽可能地多读数据) while(1) { /* Read as long as there is data to read. */ // 获取连接对象的接收缓冲区的长度,表示一次最多能多大的数据量 rcvbuflen = sdslen(link->rcvbuf); // 如果接收缓冲区的长度小于8字节(头信息),就无法读入消息的总长 if (rcvbuflen < 8) { /* First, obtain the first 8 bytes to get the full message * length. */ readlen = 8 - rcvbuflen; } else {// 能够读入完整数据信息 /* Finally read the full message. */ hdr = (clusterMsg*) link->rcvbuf; // 如果是8个字节 if (rcvbuflen == 8) { /* Perform some sanity check on the message signature * and length. */ // 如果前四个字节不是"RCmb"签名,释放连接 if (memcmp(hdr->sig,"RCmb",4) != 0 || ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) { serverLog(LL_WARNING, "Bad message length or signature received " "from Cluster bus."); handleLinkIOError(link); return; } } // 记录已经读入的内容长度 readlen = ntohl(hdr->totlen) - rcvbuflen; if (readlen > sizeof(buf)) readlen = sizeof(buf); } // 读入内容 nread = read(fd,buf,readlen); // 没有内容可读 if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */ // 读错误,释放连接 if (nread <= 0) { /* I/O error... */ serverLog(LL_DEBUG,"I/O error reading from node link: %s", (nread == 0) ? "connection closed" : strerror(errno)); handleLinkIOError(link); return; } else { /* Read data and recast the pointer to the new buffer. */ // 将读到的数据追加到连接对象的接收缓冲区中 link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread); hdr = (clusterMsg*) link->rcvbuf; rcvbuflen += nread; } /* Total length obtained? Process this packet. */ // 检查已读入内容的长度,看是否整条信息已经被读入了 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) { // 如果读到的数据有效,处理读到接收缓冲区的数据 if (clusterProcessPacket(link)) { // 处理成功,则设置新的空的接收缓冲区 sdsfree(link->rcvbuf); link->rcvbuf = sdsempty(); } else { return; /* Link no longer valid. */ } } } }
/* When this function is called, there is a packet to process starting * at node->rcvbuf. Releasing the buffer is up to the caller, so this * function should just handle the higher level stuff of processing the * packet, modifying the cluster state if needed. * * 当这个函数被调用时,说明 node->rcvbuf 中有一条待处理的信息。 * 信息处理完毕之后的释放工作由调用者处理,所以这个函数只需负责处理信息就可以了。 * * The function returns 1 if the link is still valid after the packet * was processed, otherwise 0 if the link was freed since the packet * processing lead to some inconsistency error (for instance a PONG * received from the wrong sender ID). * 如果函数返回 1 ,那么说明处理信息时没有遇到问题,连接依然可用。 * 如果函数返回 0 ,那么说明信息处理时遇到了不一致问题 * (比如接收到的 PONG 是发送自不正确的发送者 ID 的),连接已经被释放。 */ intclusterProcessPacket(clusterLink *link){ // 连接的输入(接收)缓冲区 clusterMsg *hdr = (clusterMsg*) link->rcvbuf; // 消息的总长度 uint32_t totlen = ntohl(hdr->totlen); // 消息的类型 uint16_t type = ntohs(hdr->type); // 通过Cluster接收到的消息数量加1 server.cluster->stats_bus_messages_received++; serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsignedlong) totlen); /* Perform sanity checks */ // 检查消息包的合法性 // 至少包含一个签名、版本、总长、消息正文包含的节点信息数量 if (totlen < 16) return1; /* At least signature, version, totlen, count. */ // 总长度大于接收缓冲区的大小 if (totlen > sdslen(link->rcvbuf)) return1; // 目前版本号为0,不处理其他版本 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) { /* Can't handle messages of different versions. */ return1; } // 获取发送消息节点的标识 uint16_t flags = ntohs(hdr->flags); uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0; clusterNode *sender; // 如果消息是PING、PONG或者MEET if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { // 消息正文包含的节点信息数量 uint16_t count = ntohs(hdr->count); uint32_t explen; /* expected length of this packet */ // 计算消息包应该的长度 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += (sizeof(clusterMsgDataGossip)*count); // 总长度和计算的长度不相同返回1 if (totlen != explen) return1; // 如果消息是FAIL } elseif (type == CLUSTERMSG_TYPE_FAIL) { // 计算消息包应该的长度 uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataFail); // 总长度和计算的长度不相同返回1 if (totlen != explen) return1; // 如果消息是PUBLISH } elseif (type == CLUSTERMSG_TYPE_PUBLISH) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr->data.publish.msg.channel_len) + ntohl(hdr->data.publish.msg.message_len); if (totlen != explen) return1; // 如果消息是故障有关的 } elseif (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK || type == CLUSTERMSG_TYPE_MFSTART) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); if (totlen != explen) return1; //如果消息是UPDATE的 } elseif (type == CLUSTERMSG_TYPE_UPDATE) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataUpdate); if (totlen != explen) return1; } /* Check if the sender is a known node. */ //如果消息是UPDATE的 // 从集群中查找sender节点 sender = clusterLookupNode(hdr->sender); // 节点存在,并且不是 HANDSHAKE 节点 // 那么个更新节点的配置纪元信息 if (sender && !nodeInHandshake(sender)) { /* Update our curretEpoch if we see a newer epoch in the cluster. */ // 如果sender的纪元大于集群的纪元,更新集群的纪元 senderCurrentEpoch = ntohu64(hdr->currentEpoch); senderConfigEpoch = ntohu64(hdr->configEpoch); // 更新集群的当前纪元 if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; /* Update the sender configEpoch if it is publishing a newer one. */ // 更新sender的配置纪元 if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; // 更新配置和状态 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_FSYNC_CONFIG); } /* Update the replication offset info for this node. */ // 更新sender的复制偏移量和更新复制偏移量的时间 sender->repl_offset = ntohu64(hdr->offset); sender->repl_offset_time = mstime(); /* If we are a slave performing a manual failover and our master * sent its offset while already paused, populate the MF state. */ // 如果当前节点是正在执行手动故障转移的从节点,该当前节点的主节点正是sender节点 // 并且主节点发送复制偏移量时已经暂停手动故障转移 if (server.cluster->mf_end && nodeIsSlave(myself) && myself->slaveof == sender && hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_master_offset == 0) { // 设置当前从节点已经复制的偏移量 server.cluster->mf_master_offset = sender->repl_offset; serverLog(LL_WARNING, "Received replication offset for paused " "master manual failover: %lld", server.cluster->mf_master_offset); } } /* Initial processing of PING and MEET requests replying with a PONG. */ // 初始处理PING和MEET请求,用PONG作为回复 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node); /* We use incoming MEET messages in order to set the address * for 'myself', since only other cluster nodes will send us * MEET messagses on handshakes, when the cluster joins, or * later if we changed address, and those nodes will use our * official address to connect to us. So by obtaining this address * from the socket is a simple way to discover / update our own * address in the cluster without it being hardcoded in the config. * 我们使用传入的MEET消息来设置当前myself节点的地址,因为只有其他集群中的节点在握手的时会发送MEET消息, * 当有节点加入集群时,或者如果我们改变地址,这些节点将使用我们公开的地址来连接我们, * 所以在集群中,通过套接字来获取地址是一个简单的方法去discover / update我们自己的地址,而不是在配置中的硬设置 * * However if we don't have an address at all, we update the address * even with a normal PING packet. If it's wrong it will be fixed * by MEET later. * 但是,如果我们根本没有地址,即使使用正常的PING数据包,我们也会更新该地址。 如果是错误的,那么会被MEET修改 */ // 如果是MEET消息 或者是其他消息但是当前集群节点的IP为空 if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') { char ip[NET_IP_STR_LEN]; // 可以根据fd来获取ip,并设置myself节点的IP if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 && strcmp(ip,myself->ip)) { memcpy(myself->ip,ip,NET_IP_STR_LEN); serverLog(LL_WARNING,"IP address for this node updated to %s", myself->ip); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } } /* Add this node if it is new for us and the msg type is MEET. * 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息, * 那么将这个节点添加到集群的节点列表里面。 * In this stage we don't try to add the node with the right * flags, slaveof pointer, and so forth, as this details will be * resolved when we'll receive PONGs from the node. * 节点目前的 flag 、 slaveof 等属性的值都是未设置的, * 等当前节点向对方发送 PING 命令之后,这些信息可以从对方回复的 PONG 信息中取得。 */ if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node; // 创建一个处于握手状态的节点 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE); // 设置 IP 和端口 nodeIp2String(node->ip,link); node->port = ntohs(hdr->port); // 添加到集群中 clusterAddNode(node); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } /* If this is a MEET packet from an unknown node, we still process * the gossip section here since we have to trust the sender because * of the message type. */ // 如果是从一个未知的节点发送过来MEET包,处理流言信息 if (!sender && type == CLUSTERMSG_TYPE_MEET) // 处理流言中的 PING or PONG 数据包 clusterProcessGossipSection(hdr,link); /* Anyway reply with a PONG */ // 回复一个PONG消息 clusterSendPing(link,CLUSTERMSG_TYPE_PONG); } .... }
/* PING, PONG, MEET: process config information. */ // 如果是PING、PONG或MEET消息,处理配置信息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { serverLog(LL_DEBUG,"%s packet received: %p", type == CLUSTERMSG_TYPE_PING ? "ping" : "pong", (void*)link->node); //如果关联该连接的节点存在 if (link->node) { // 如果关联该连接的节点处于 HANDSHAKE 状态 if (nodeInHandshake(link->node)) { /* If we already have this node, try to change the * IP/port of the node with the new one. */ // sender节点存在,用该新的连接地址更新sender节点的地址 if (sender) { serverLog(LL_VERBOSE, "Handshake: we already know node %.40s, " "updating the address if needed.", sender->name); // 如果有需要的话,更新节点的地址 if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } /* Free this node as we already have it. This will * cause the link to be freed as well. */ // 释放关联该连接的节点 clusterDelNode(link->node); return0; } /* First thing to do is replacing the random name with the * right node name if this was a handshake stage. */ // 用节点的真名sender替换在 HANDSHAKE 时创建的随机名字 clusterRenameNode(link->node, hdr->sender); serverLog(LL_DEBUG,"Handshake with node %.40s completed.", link->node->name); // 取消握手状态,设置节点的角色 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); // 如果sender的地址和关联该连接的节点的地址不相同 } elseif (memcmp(link->node->name,hdr->sender, CLUSTER_NAMELEN) != 0) { /* If the reply has a non matching node ID we * disconnect this node and set it as not having an associated * address. */ serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d", link->node->name, (int)(mstime()-(link->node->ctime)), link->node->flags); // 设置NOADDR标识,清空关联连接节点的地址 link->node->flags |= CLUSTER_NODE_NOADDR; link->node->ip[0] = '\0'; link->node->port = 0; // 断开连接 freeClusterLink(link); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); return0; } } /* Update the node address if it changed. */ // 如果发送的消息为PING类型,sender节点不处于握手状态 // 那么更新sender节点的IP地址 if (sender && type == CLUSTERMSG_TYPE_PING && !nodeInHandshake(sender) && nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } /* Update our info about the node */ // 如果这是一条 PONG 消息,那么更新我们关于 node 节点的认识 if (link->node && type == CLUSTERMSG_TYPE_PONG) { // 更新接收到PONG的时间 link->node->pong_received = mstime(); // 清零最近一次发送PING的时间戳 link->node->ping_sent = 0; /* The PFAIL condition can be reversed without external * help if it is momentary (that is, if it does not * turn into a FAIL state). * 接到节点的 PONG 回复,我们可以移除节点的 PFAIL 状态。 * * The FAIL condition is also reversible under specific * conditions detected by clearNodeFailureIfNeeded(). * FAIL标识能否删除,需要clearNodeFailureIfNeeded()来决定 */ // 如果关联该连接的节点疑似下线 if (nodeTimedOut(link->node)) { // 取消PFAIL标识 link->node->flags &= ~CLUSTER_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); // 如果关联该连接的节点已经被判断为下线 } elseif (nodeFailed(link->node)) { // 如果一个节点被标识为FAIL,需要检查是否取消该节点的FAIL标识,因为该节点在一定时间内重新上线了 clearNodeFailureIfNeeded(link->node); } }
然后就是执行if (link->node && type == CLUSTERMSG_TYPE_PONG)判断,更新接收PONG的时间戳,清零发送PING的时间戳,根据接收PONG的时间等信息判断目标节点是否下线,如果下线要进行故障转移等操作。 之后A节点并不会立即向目标节点发送PING消息,而是要等待下一次时间事件的发生,在clusterCron()函数中,每次执行都需要对集群中所有节点进行故障检测和主从切换等等操作,因此在遍历节点时,会处理以下一种情况:
1 2 3 4 5 6 7 8 9 10 11
/* If we have currently no active ping in this instance, and the * received PONG is older than half the cluster timeout, send * a new ping now, to ensure all the nodes are pinged without * a too big delay. */ if (node->link && node->ping_sent == 0 && (now - node->pong_received) > server.cluster_node_timeout/2) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }