构建客户端库

NSQ 的设计将大量责任推给客户端库,以维护整体集群的健壮性和性能。

本指南试图概述良好行为的客户端库需要履行的各种责任。由于向 nsqd 发布消息非常简单(只需向 /pub 端点发送 HTTP POST),本文档重点关注消费者。

通过设定这些期望,我们希望为 NSQ 用户在不同语言中实现一致性提供基础。

概述

  1. 配置
  2. 发现 (可选)
  3. 连接处理
  4. 特性协商
  5. 数据流 / 心跳
  6. 消息处理
  7. RDY 状态
  8. 退避
  9. 加密/压缩

配置

在高层级上,我们对配置的理念是设计系统以支持不同工作负载的灵活性,使用合理的默认值“开箱即用”良好运行,并最小化调整旋钮的数量。

消费者通过与 nsqd 实例的 TCP 连接订阅 topic 上的 channel。每个连接只能订阅一个主题,因此多主题消费需要相应地结构化。

使用 nsqlookupd 进行发现是可选的,因此客户端库应支持消费者直接连接到一个或多个 nsqd 实例的配置,或配置为轮询一个或多个 nsqlookupd 实例的配置。当消费者配置为轮询 nsqlookupd 时,轮询间隔应可配置。此外,由于 NSQ 的典型部署是在具有许多生产者和消费者的分布式环境中,客户端库应自动基于配置值的随机百分比添加抖动。这将有助于避免连接的雪崩效应。更多细节见 发现

消费者的一个重要性能旋钮是它可以在 nsqd 期望响应之前接收的消息数量。这种流水线处理促进了缓冲的、批处理的和异步的消息处理。按照惯例,这个值称为 max_in_flight,它会影响 RDY 状态的管理。更多细节见 RDY 状态

作为一个设计用于优雅处理故障的系统,客户端库预计会为失败的消息实现重试处理,并提供选项来限制该行为,例如每个消息的重试次数。更多细节见 消息处理

相关地,当消息处理失败时,客户端库预计会自动处理消息的重新排队。NSQ 支持在 REQ 命令中发送延迟。客户端库预计会提供选项来设置初始延迟(针对第一次失败)以及后续失败时如何更改延迟。更多细节见 退避

最重要的是,客户端库应支持某种方法来配置消息处理的回调处理程序。这些回调的签名应简单,通常接受单个参数(“消息对象”的实例)。

发现

NSQ 的一个重要组件是 nsqlookupd,它为消费者提供了一个发现服务,用于在运行时定位提供给定主题的 nsqd

虽然是可选的,但使用 nsqlookupd 大大减少了维护和扩展大型分布式 NSQ 集群所需的配置量。

当消费者使用 nsqlookupd 进行发现时,客户端库应管理轮询所有 nsqlookupd 实例以获取提供相关主题的 nsqd 的最新集的过程,并管理与这些 nsqd 的连接。

查询 nsqlookupd 实例很简单。向查找端点发送 HTTP 请求,并使用消费者尝试发现的主题作为查询参数(例如 /lookup?topic=clicks)。响应格式为 JSON:

{
    "channels": ["archive", "science", "metrics"],
    "producers": [
        {
            "broadcast_address": "clicksapi01.routable.domain.net",
            "hostname": "clicksapi01.domain.net",
            "remote_address": "172.31.27.114:51996",
            "tcp_port": 4150,
            "http_port": 4151,
            "version": "1.0.0-compat"
        },
        {
            "broadcast_address": "clicksapi02.routable.domain.net",
            "hostname": "clicksapi02.domain.net",
            "remote_address": "172.31.34.29:14340",
            "tcp_port": 4150,
            "http_port": 4151,
            "version": "1.0.0-compat"
        }
    ]
}

应使用 broadcast_addresstcp_port 连接到 nsqd。由于设计上 nsqlookupd 实例不共享或协调它们的数据,客户端库应将从所有 nsqlookupd 查询接收到的列表并集,以构建要连接的最终 nsqd 列表。应将 broadcast_address:tcp_port 组合用作此并集的唯一键。

应使用周期性定时器反复轮询配置的 nsqlookupd,以便消费者自动发现新的 nsqd。客户端库应自动启动与所有新发现实例的连接。

当客户端库执行开始时,它应通过启动对配置的 nsqlookupd 实例的初始请求集来引导此轮询过程。

连接处理

一旦消费者有一个要连接的 nsqd(通过发现或手动配置),它应打开到 broadcast_address:port 的 TCP 连接。对于消费者想要订阅的每个主题,应为每个 nsqd 建立单独的 TCP 连接。

连接到 nsqd 实例时,客户端库应按顺序发送以下数据:

  1. 魔术标识符
  2. 一个 IDENTIFY 命令(及其负载)并读取/验证响应(见 特性协商
  3. 一个 SUB 命令(指定所需的主题)并读取/验证响应
  4. 初始的 RDY 计数为 1(见 RDY 状态)。

(协议的低级细节可在 规范 中获取)

重新连接

客户端库应自动处理重新连接,如下所示:

  • 如果消费者配置了特定的 nsqd 实例列表,则应通过以指数退避方式延迟重试尝试来处理重新连接(例如,在 8s、16s、32s 等后尝试重新连接,直至最大值)。

  • 如果消费者配置为通过 nsqlookupd 发现实例,则应基于轮询间隔自动处理重新连接(例如,如果消费者从 nsqd 断开连接,客户端库应 在后续 nsqlookupd 轮询回合发现该实例时尝试重新连接)。这确保消费者可以了解引入到拓扑中的 nsqd 以及 移除(或故障)的 nsqd

特性协商

IDENTIFY 命令可用于设置 nsqd 侧元数据、修改客户端设置并协商特性。它满足两个需求:

  1. 在某些情况下,客户端希望修改 nsqd 与其交互的方式(例如修改客户端的心跳间隔并启用压缩、TLS、输出缓冲等 - 完整列表见 规范
  2. nsqd 用包含客户端在与实例交互时应尊重的服务器侧重要配置值的 JSON 负载响应 IDENTIFY 命令。

连接后,根据用户的配置,客户端库应发送一个 IDENTIFY 命令,其主体为 JSON 负载:

{
    "client_id": "metrics_increment",
    "hostname": "app01.bitly.net",
    "heartbeat_interval": 30000,
    "feature_negotiation": true
}

feature_negotiation 字段表示客户端可以接受返回的 JSON 负载。client_idhostnamensqd(和 nsqadmin)用于标识客户端的任意文本字段。heartbeat_interval 在每个客户端基础上配置心跳间隔。

如果 nsqd 不支持特性协商(在 nsqd v0.2.20+ 中引入),它将响应 OK,否则:

{
    "max_rdy_count": 2500,
    "version": "0.2.20-alpha"
}

有关 max_rdy_count 字段使用的更多细节,请参阅 RDY 状态 部分。

数据流和心跳

一旦消费者进入订阅状态,NSQ 协议中的数据流就是异步的。对于消费者而言,这意味着为了构建真正健壮且高性能的客户端库,它们应使用异步网络 IO 循环和/或“线程”来结构化(引号用于表示 OS 级线程和用户空间线程,如协程)。

此外,客户端预计会响应来自它们连接的 nsqd 实例的周期性心跳。默认情况下,这每 30 秒发生一次。客户端可以用任何命令响应,但按照惯例,最简单的方法是在接收到心跳时简单响应 NOP。有关如何识别心跳的具体细节,请参阅 协议规范

应有一个“线程”专用于从 TCP 套接字读取数据、从帧中解包数据,并执行多路复用逻辑以适当路由数据。这也是处理心跳的最合适位置。在最低层,读取协议涉及以下顺序步骤:

  1. 读取 4 字节大端 uint32 大小
  2. 读取大小字节数据
  3. 解包数据
  4. 获利
  5. 转到 1

错误简要插曲

由于其异步性质,要将协议错误与生成它们的命令关联起来,需要一些额外的状态跟踪。相反,我们采用了“快速失败”方法,因此绝大多数协议级错误处理都是致命的。这意味着如果客户端发送无效命令(或进入无效状态),它连接的 nsqd 实例将通过强制关闭连接(并,如果可能,向客户端发送错误)来保护自身(和系统)。这与上面提到的连接处理相结合,使得系统更加健壮和稳定。

唯一非致命的错误是:

  • E_FIN_FAILED - 针对无效消息 ID 的 FIN 命令
  • E_REQ_FAILED - 针对无效消息 ID 的 REQ 命令
  • E_TOUCH_FAILED - 针对无效消息 ID 的 TOUCH 命令

由于这些错误通常是时序问题,它们不被视为致命。这些情况通常发生在消息在 nsqd 侧超时并被重新排队并交付给另一个消费者时。原始接收者不再允许代表该消息响应。

消息处理

当 IO 循环解包包含消息的数据帧时,它应将该消息路由到配置的处理程序进行处理。

发送的 nsqd 预计会在其配置的消息超时内(默认:60 秒)收到回复。有几种可能的情景:

  1. 处理程序表示消息已成功处理。
  2. 处理程序表示消息处理不成功。
  3. 处理程序决定需要更多时间来处理消息。
  4. 飞行中超时到期,nsqd 自动重新排队消息。

在前 3 种情况下,客户端库应在消费者的代表下发送适当的命令(分别对应 FINREQTOUCH)。

FIN 命令是其中最简单的。它告诉 nsqd 可以安全丢弃消息。FIN 也可用于丢弃不想处理或重试的消息。

REQ 命令告诉 nsqd 应重新排队消息(带有一个可选参数指定延迟额外尝试的时间量)。如果消费者未指定可选参数,客户端库应根据处理消息的尝试次数自动计算持续时间(通常乘数就足够了)。客户端库应丢弃超过配置最大尝试次数的消息。当发生此情况时,应执行用户提供的回调以通知并启用特殊处理。

如果消息处理程序需要的时间超过配置的消息超时,则可以使用 TOUCH 命令在 nsqd 侧重置计时器。这可以反复执行,直到消息被 FINREQ,直至发送 nsqd 配置的 max_msg_timeout。客户端库绝不应该自动代表消费者执行 TOUCH

如果发送的 nsqd 实例未收到任何响应,消息将超时并自动重新排队以交付给可用消费者。

最后,每个消息的一个属性是尝试次数。客户端库应将此值与配置的最大值比较,并丢弃超过它的消息。当消息被丢弃时,应触发一个回调。典型默认实现此回调可能包括写入磁盘上的目录、日志记录等。用户应能够覆盖默认处理。

RDY 状态

由于消息是从 nsqd 推送到消费者的,我们需要一种在用户空间中管理数据流的方式,而不是依赖低级 TCP 语义。消费者的 RDY 状态是 NSQ 的流控制机制。

配置部分 中所述,消费者配置了一个 max_in_flight。这是一个并发性和性能旋钮,例如,一些下游系统能够更容易地批量处理消息,并从更高的 max_in_flight 中受益匪浅。

当消费者连接到 nsqd(并订阅)时,它被置于初始 RDY 状态为 0。不会交付任何消息。

客户端库有几个责任:

  1. 引导并均匀分布配置的 max_in_flight 到所有连接。
  2. 永不让所有连接的 RDY 计数的总和(total_rdy_count)超过配置的 max_in_flight
  3. 永不超出每个连接 nsqd 配置的 max_rdy_count
  4. 暴露一个 API 方法来可靠地指示消息流饥饿

1. 引导和分布

在为连接选择适当的 RDY 计数时(以均匀分布 max_in_flight),有几个考虑因素:

  • 连接数是动态的,通常事先未知(即,当通过 nsqlookupd 发现 nsqd 时)。
  • max_in_flight 可能低于连接数

为了启动消息流,客户端库需要发送一个初始 RDY 计数。由于最终连接数通常事先未知,它应从值为 1 开始,这样客户端库不会不公平地偏好第一个连接(们)。

此外,在每条消息处理后,客户端库应评估是否是更新 RDY 状态的时候。如果当前值为 0 或低于上次发送值的 ~25%,则应触发更新。

客户端库应始终尝试在所有连接上均匀分布 RDY 计数。通常,这被实现为 max_in_flight / num_conns

然而,当 max_in_flight < num_conns 时,这个简单公式就不够了。在这种状态下,客户端库应通过测量自上次在给定连接上接收消息以来的持续时间来动态运行时评估连接的 nsqd “活性”。在可配置的过期后,它应重新分布任何可用的 RDY 计数到一个新的(随机) nsqd 集。通过这样做,您保证最终会找到有消息的 nsqd。显然,这会影响延迟。

2. 维护 max_in_flight

客户端库应为给定消费者维护飞行中消息的最大数量上限。具体而言,每个连接的 RDY 计数的总和绝不应超过配置的 max_in_flight

下面是 Python 示例代码,用于确定提议的 RDY 计数是否对于给定连接有效:

def send_ready(reader, conn, count):
    if (reader.total_ready_count + count) > reader.max_in_flight:
        return

    conn.send_ready(count)
    conn.rdy_count = count
    reader.total_ready_count += count

3. nsqd 最大 RDY 计数

每个 nsqd 都可以使用 --max-rdy-count 配置(有关消费者可以执行的握手以确定此值的更多信息,请参阅 特性协商)。如果消费者发送超出可接受范围的 RDY 计数,其连接将被强制关闭。为了向后兼容,如果 nsqd 实例不支持 特性协商,则应假设此值为 2500

4. 消息流饥饿

最后,客户端库应提供一个 API 方法来指示消息流饥饿。对于消费者(在其消息处理程序中)简单地将飞行中消息数与其配置的 max_in_flight 比较以决定“处理一批”是不够的。有两种情况这会出问题:

  1. 当消费者配置 max_in_flight > 1 时,由于可变的 num_conns,有情况 max_in_flight 不能被 num_conns 均匀除尽。由于合同规定您绝不应该超过 max_in_flight,您必须向下取整,从而导致所有 RDY 计数的总和小于 max_in_flight 的情况。
  2. 考虑只有 nsqd 子集有消息的情况。由于预期的 均匀分布 RDY 计数,那些活跃的 nsqd 只有配置的 max_in_flight 的一部分。

在两种情况下,消费者实际上永远不会收到 max_in_flight 条消息。因此,客户端库应暴露一个 is_starved 方法,它将评估任何连接是否饥饿,如下所示:

def is_starved(conns):
    for c in conns:
        # 常量 0.85 旨在*预测*饥饿而不是等待它
        if c.in_flight > 0 and c.in_flight >= (c.last_ready * 0.85):
            return True
    return False

消息处理程序应使用 is_starved 方法来可靠地识别何时处理一批消息。

退避

当消息处理失败时该做什么是一个复杂的问题,无法简单回答。消息处理 部分详细说明了客户端库行为,该行为会延迟失败消息的处理一段时间(增加)。谜题的另一部分是是否减少吞吐量。这两个功能之间的相互作用对于整体系统稳定性至关重要。

通过减慢处理速率,或“退避”,消费者允许下游系统从瞬态故障中恢复。然而,这种行为应可配置,因为它并不总是理想的,例如在延迟优先的情况下。

退避应通过向适当的 nsqd 发送 RDY 0 来实现,停止消息流。保持此状态的持续时间应基于重复失败的次数计算(指数)。类似地,成功处理应减少此持续时间,直到阅读器不再处于退避状态。

当阅读器处于退避状态时,在超时到期后,客户端库应仅发送 RDY 1,无论 max_in_flight 如何。这有效地在返回全速之前“试水”。此外,在退避超时期间,客户端库应忽略任何成功或失败结果,以计算退避持续时间(即它应仅考虑每个退避超时的一个结果)。

nsq_client_flow

加密/压缩

NSQ 通过 IDENTIFY 命令支持加密和/或压缩特性协商。TLS 用于加密。支持 Snappy 和 DEFLATE 用于压缩。Snappy 作为第三方库可用,但大多数语言对 DEFLATE 有一些原生支持。

当接收到 IDENTIFY 响应并通过 tls_v1 标志请求 TLS 时,您将获得类似于以下的 JSON:

{
    "deflate": false,
    "deflate_level": 0,
    "max_deflate_level": 6,
    "max_msg_timeout": 900000,
    "max_rdy_count": 2500,
    "msg_timeout": 60000,
    "sample_rate": 0,
    "snappy": true,
    "tls_v1": true,
    "version": "0.2.28"
}

在确认 tls_v1 设置为 true(表示服务器支持 TLS)后,您会在发送或接收任何其他内容之前启动 TLS 握手(例如,在 Python 中使用 ssl.wrap_socket 调用)。在成功的 TLS 握手后,您必须立即读取加密的 NSQ OK 响应。

类似地,如果您启用了压缩,您将查找 snappydeflatetrue,然后使用适当的(解)压缩器包装套接字的读和写调用。同样,立即读取压缩的 NSQ OK 响应。

这些压缩特性是互斥的。

在完成协商加密/压缩之前,您必须防止缓冲,或者确保在协商特性时读取为空。

综合起来

分布式系统很有趣。

NSQ 集群的各种组件之间的交互协同工作,提供了一个构建健壮、高性能和稳定基础设施的平台。我们希望本指南能阐明客户端的作用有多重要。

在实际实现所有这些方面时,我们将 pynsqgo-nsq 视为我们的参考代码库。pynsq 的结构可以分解为三个核心组件:

  • Message - 一个高级消息对象,它暴露了用于响应 nsqd 的有状态方法(FINREQTOUCH 等)以及元数据,如尝试次数和时间戳。

  • Connection - 一个围绕特定 nsqd 的 TCP 连接的高级包装器,它了解飞行中消息、其 RDY 状态、协商的特性以及各种计时。

  • Consumer - 用户交互的前端 API,它处理发现、创建连接(并订阅)、引导和管理 RDY 状态、解析原始传入数据、创建 Message 对象,并将消息分发到处理程序。

  • Producer - 用户交互的前端 API,它处理发布。

我们很高兴帮助任何对构建 NSQ 客户端库感兴趣的人。我们正在寻找贡献者来继续扩展我们的语言支持,并完善现有库的功能。社区已经开源了 许多客户端库