Redis 源码硬核解析系列专题 - 扩展篇:Gossip协议的具体实现

扩展篇:Gossip协议的具体实现

1. 引言

Redis Cluster使用Gossip协议实现节点间的状态同步和一致性维护。Gossip协议是一种去中心化的通信机制,通过节点间的“谣言传播”方式交换信息,具有高容错性和扩展性。本篇将深入剖析Redis中Gossip协议的具体实现,包括消息格式、传播机制和故障检测逻辑。


2. Gossip协议在Redis中的作用

  • 状态同步:确保每个节点了解集群中所有节点的状态(如在线、槽分配)。
  • 故障检测:通过心跳检测发现节点失败,触发故障转移。
  • 去中心化:无需主控节点,适应动态集群变化。

3. 核心结构与消息格式

3.1 集群节点结构

代码片段cluster.h):

1
2
3
4
5
6
7
8
9
typedef struct clusterNode {
    char name[CLUSTER_NAMELEN]; // 节点ID
    int flags;                  // 状态(如CLUSTER_NODE_FAIL)
    uint16_t port;              // 端口
    unsigned char slots[16384/8]; // 槽位图
    mstime_t ping_sent;         // 上次发送PING时间
    mstime_t pong_received;     // 上次收到PONG时间
    clusterLink *link;          // 与该节点的连接
} clusterNode;

硬核解析

  • ping_sent/pong_received:用于心跳检测。
  • link:保存TCP连接和发送缓冲区。
3.2 Gossip消息格式

代码片段cluster.h):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
typedef struct clusterMsg {
    char sig[4];               // "RCmb"
    uint32_t totlen;           // 消息总长度
    uint16_t ver;              // 协议版本
    uint16_t type;             // 消息类型(如CLUSTERMSG_TYPE_PING)
    char sender[CLUSTER_NAMELEN]; // 发送者ID
    unsigned char myslots[16384/8]; // 发送者槽位图
    char master[CLUSTER_NAMELEN]; // 主节点ID(若为从节点)
    uint32_t ping_sent;        // PING发送时间
    uint32_t pong_received;    // PONG接收时间
    uint16_t port;             // 发送者端口
    uint16_t state;            // 发送者状态
    // ... 其他字段
} clusterMsg;

硬核解析

  • type:支持PING、PONG、MEET、FAIL等多种消息。
  • myslots:告知对方自己的槽分配。
  • 可扩展:消息末尾可附加额外数据(如其他节点状态)。

4. Gossip协议的核心实现

4.1 发送PING消息(clusterSendPing()

代码片段cluster.c):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void clusterSendPing(clusterLink *link, int type) {
    clusterMsg buf;
    clusterNode *node = link->node;
    buf.type = type; // CLUSTERMSG_TYPE_PING 或 PONG
    memcpy(buf.sender, server.cluster->myself->name, CLUSTER_NAMELEN);
    memcpy(buf.myslots, server.cluster->myself->slots, 16384/8);
    buf.port = server.port;
    buf.state = server.cluster->state;

    // 添加Gossip信息
    int gossipcount = 0;
    dictIterator *di = dictGetIterator(server.cluster->nodes);
    dictEntry *de;
    while ((de = dictNext(di)) && gossipcount < 10) {
        clusterNode *n = dictGetVal(de);
        if (n != server.cluster->myself && !(n->flags & CLUSTER_NODE_FAIL)) {
            clusterMsgDataGossip *gossip = &buf.data.gossip[gossipcount++];
            memcpy(gossip->nodename, n->name, CLUSTER_NAMELEN);
            gossip->ping_sent = n->ping_sent;
            gossip->pong_received = n->pong_received;
        }
    }
    dictReleaseIterator(di);

    buf.totlen = sizeof(clusterMsg) + (gossipcount * sizeof(clusterMsgDataGossip));
    clusterSendMessage(link, (char*)&buf, buf.totlen);
}

硬核解析

  • 随机传播:每次PING携带最多10个其他节点的状态(Gossip机制)。
  • 消息构建:包含自身信息和部分集群状态。
  • clusterSendMessage():通过TCP发送。
4.2 处理消息(clusterProcessPacket()

代码片段cluster.c):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void clusterProcessPacket(clusterLink *link) {
    clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
    clusterNode *sender = clusterLookupNode(hdr->sender);
    if (hdr->type == CLUSTERMSG_TYPE_PING || hdr->type == CLUSTERMSG_TYPE_PONG) {
        sender->ping_sent = hdr->ping_sent;
        sender->pong_received = hdr->pong_received;
        if (hdr->type == CLUSTERMSG_TYPE_PING)
            clusterSendPing(link, CLUSTERMSG_TYPE_PONG);
    }

    // 处理Gossip信息
    int gossipcount = (ntohl(hdr->totlen) - sizeof(clusterMsg)) / sizeof(clusterMsgDataGossip);
    for (int i = 0; i < gossipcount; i++) {
        clusterMsgDataGossip *g = &hdr->data.gossip[i];
        clusterNode *node = clusterLookupNode(g->nodename);
        if (node && g->pong_received > node->pong_received)
            node->pong_received = g->pong_received; // 更新最新状态
    }
}

硬核解析

  • PING/PONG:更新发送者的心跳时间,回复PONG。
  • Gossip更新:根据收到的节点状态更新本地视图,优先保留最新数据。
4.3 故障检测(markNodeAsFailingIfNeeded()

代码片段cluster.c):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void markNodeAsFailingIfNeeded(clusterNode *node) {
    if (node->pong_received == 0) node->pong_received = node->ping_sent;
    mstime_t now = mstime();
    if (now - node->pong_received > server.cluster_node_timeout) {
        node->flags |= CLUSTER_NODE_PFAIL; // 疑似故障
        if (clusterNodeFailureReports(node) >= server.cluster->size / 2) {
            node->flags |= CLUSTER_NODE_FAIL; // 确认故障
            clusterSendFail(node->name); // 广播FAIL消息
        }
    }
}

硬核解析

  • PFAIL:单节点认为对方超时。
  • FAIL:多数节点同意后标记为故障。
  • cluster_node_timeout:默认15秒,可配置。

Mermaid故障检测流程

graph TD
    A["markNodeAsFailingIfNeeded()"] --> B{"now - pong_received > timeout?"}
    B -->|Yes| C["标记PFAIL"]
    C --> D{"失败报告数 > 半数?"}
    D -->|Yes| E["标记FAIL"]
    E --> F["clusterSendFail()"]
    B -->|No| G["保持状态"]
    D -->|No| G

5. Gossip协议的优化

  • 传播效率:每次随机携带10个节点信息,平衡带宽与同步速度。
  • 容错性:去中心化设计,无单点依赖。
  • 一致性:最终一致性,依赖多数确认故障。

6. 总结与调试建议

  • 收获:理解Redis中Gossip协议的实现细节。
  • 调试技巧
    • CLUSTER NODES查看节点状态。
    • tcpdump抓包分析PING/PONG消息。
  • 应用:借鉴Gossip设计分布式状态同步系统。
updatedupdated2025-03-312025-03-31