TCP 协议规范

NSQ 协议足够简单,在任何语言中构建客户端都应该是轻而易举的。我们 提供了官方的 Go 和 Python 客户端库。

一个 nsqd 进程监听一个可配置的 TCP 端口,接受客户端连接。

连接后,客户端必须发送一个 4 字节的“magic”标识符,表示他们将通信的协议版本(升级变得简单)。

  • V2 (4 字节 ASCII [space][space][V][2]) 一种基于推送的流式协议,用于消费(以及用于发布的请求/响应)

认证后,客户端可以选择发送一个 IDENTIFY 命令来提供自定义元数据(例如,更具描述性的标识符)并协商功能。为了开始消费消息,客户端必须订阅到一个通道。

订阅后,客户端被置于 RDY 状态 0。这意味着不会向客户端发送任何消息。当客户端准备好接收消息时,它应该发送一个命令来更新其 RDY 状态到它准备处理的某个数字,比如 100。没有其他命令的情况下,将向客户端推送 100 条可用消息(每次递减服务器端的该客户端 RDY 计数)。

V2 协议还支持客户端心跳。每 30 秒(默认但可配置),nsqd 将发送一个 _heartbeat_ 响应并期望返回一个命令。如果客户端空闲,发送 NOP。在 2 个未应答的 _heartbeat_ 响应后,nsqd 将超时并强制关闭未收到响应的客户端 连接。IDENTIFY 命令可用于更改/禁用此行为。

注意事项

  • 除非另有说明,所有 线上的二进制大小/整数均为 网络字节序 (即 端)

  • 有效的 topicchannel 名称是字符 [.a-zA-Z0-9_-]1 <= length <= 64 (在 nsqd 0.2.28 之前最大长度为 32

命令

IDENTIFY

在服务器上更新客户端元数据并协商功能

IDENTIFY\n
[ 4-byte size in bytes ][ N-byte JSON data ]

注意:此命令接受一个带大小前缀的 JSON 主体,相关字段:

  • client_id 一个用于区分此客户端的标识符(即,与消费者特定的内容)

  • hostname 客户端部署的主机名

  • feature_negotiation (nsqd v0.2.19+) 布尔值,用于指示客户端支持功能协商。如果服务器支持,它将返回一个支持的功能和元数据的 JSON 负载。

  • heartbeat_interval (nsqd v0.2.19+) 心跳之间的毫秒数。

    有效范围:1000 <= heartbeat_interval <= configured_max (-1 禁用心跳)

    --max-heartbeat-interval (nsqd 标志) 控制最大值

    默认值为 --client-timeout / 2

  • output_buffer_size (nsqd v0.2.21+) nsqd 在写入此客户端时使用的缓冲区大小(字节)。

    有效范围:64 <= output_buffer_size <= configured_max (-1 禁用输出缓冲)

    --max-output-buffer-size (nsqd 标志) 控制最大值

    默认值为 16kb

  • output_buffer_timeout (nsqd v0.2.21+) nsqd 已缓冲的任何数据将被刷新到此客户端的超时时间。

    有效范围:1ms <= output_buffer_timeout <= configured_max (-1 禁用超时)

    --max-output-buffer-timeout (nsqd 标志) 控制最大值

    默认值为 250ms

    警告:将客户端配置为极低的(< 25msoutput_buffer_timeout 会显著影响 nsqd 的 CPU 使用率(特别是连接 > 50 个客户端时)。

    这是由于当前实现依赖于 Go 定时器,这些定时器由 Go 运行时维护在优先级队列中。请参阅 提交消息 中的 拉取请求 #236 以获取更多细节。

  • tls_v1 (nsqd v0.2.22+) 为此连接启用 TLS。

    --tls-cert--tls-key (nsqd 标志) 启用 TLS 并配置服务器证书

    如果服务器支持 TLS,它将回复 "tls_v1": true

    客户端应在读取 IDENTIFY 响应后立即开始 TLS 握手

    服务器将在完成 TLS 握手后响应 OK

  • snappy (nsqd v0.2.23+) 为此连接启用 snappy 压缩。

    --snappy (nsqd 标志) 在服务器端启用对此的支持

    客户端应在 IDENTIFY 响应后立即期望一个 额外的、snappy 压缩的 OK 响应。

    客户端不能同时启用 snappydeflate

  • deflate (nsqd v0.2.23+) 为此连接启用 deflate 压缩。

    --deflate (nsqd 标志) 在服务器端启用对此的支持

    客户端应在 IDENTIFY 响应后立即期望一个 额外的、deflate 压缩的 OK 响应。

    客户端不能同时启用 snappydeflate

  • deflate_level (nsqd v0.2.23+) 为此连接配置 deflate 压缩级别。

    --max-deflate-level (nsqd 标志) 配置允许的最大值

    有效范围:1 <= deflate_level <= configured_max

    更高的值意味着更好的压缩,但 nsqd 的 CPU 使用率更高。

  • sample_rate (nsqd v0.2.25+) 将接收到的所有消息的百分比传递到此连接。

    有效范围:0 <= sample_rate <= 99 (0 禁用采样)

    默认值为 0

  • user_agent (nsqd v0.2.25+) 一个字符串,用于识别此客户端的代理,类似于 HTTP

    默认值:<client_library_name>/<version>

  • msg_timeout (nsqd v0.2.28+) 为传递到此客户端的消息配置服务器端消息超时(毫秒)。

成功响应:

OK

注意:如果客户端发送了 feature_negotiation(并且服务器支持它),则 响应将是一个如上所述的 JSON 负载。

错误响应:

E_INVALID
E_BAD_BODY

SUB

订阅一个 topic/channel

SUB <topic_name> <channel_name>\n

<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)
<channel_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)

成功响应:

OK

错误响应:

E_INVALID
E_BAD_TOPIC
E_BAD_CHANNEL

PUB

topic 发布一条消息:

PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]

<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)

成功响应:

OK

错误响应:

E_INVALID
E_BAD_TOPIC
E_BAD_MESSAGE
E_PUB_FAILED

MPUB

topic 发布多条消息(原子方式):

注意:在 nsqd v0.2.16+ 中可用

MPUB <topic_name>\n
[ 4-byte body size ]
[ 4-byte num messages ]
[ 4-byte message #1 size ][ N-byte binary data ]
      ... (重复 <num_messages> 次)

<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)

成功响应:

OK

错误响应:

E_INVALID
E_BAD_TOPIC
E_BAD_BODY
E_BAD_MESSAGE
E_MPUB_FAILED

DPUB

topic 发布一条延迟消息:

注意:在 nsqd v0.3.6+ 中可用

DPUB <topic_name> <defer_time>\n
[ 4-byte size in bytes ][ N-byte binary data ]

<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)
<defer_time> - 整数 D 的字符串表示,定义延迟时间,其中 0 <= D < max-requeue-timeout

成功响应:

OK

错误响应:

E_INVALID
E_BAD_TOPIC
E_BAD_MESSAGE
E_DPUB_FAILED

RDY

更新 RDY 状态(表示您准备好接收 N 条消息)

注意:从 nsqd v0.2.20+ 开始,使用 --max-rdy-count 来限制此值

RDY <count>\n

<count> - 整数 N 的字符串表示,其中 0 < N <= configured_max

注意:没有成功响应

错误响应:

E_INVALID

FIN

完成一条消息(表示 成功 处理)

FIN <message_id>\n

<message_id> - 消息 ID 为 16 字节十六进制字符串

注意:没有成功响应

错误响应:

E_INVALID
E_FIN_FAILED

REQ

重新入队一条消息(表示 处理失败

重新入队的消息被放置在队列尾部,相当于刚刚发布它, 但由于各种实现特定原因,不应明确依赖该行为,并且未来可能会更改。

类似地,在途消息超时的情况与显式 REQ 的行为相同。

REQ <message_id> <timeout>\n

<message_id> - 消息 ID 为 16 字节十六进制字符串
<timeout> - 整数 N 的字符串表示,其中 N <= 配置的最大超时
    timeout == 0 - 立即重新入队消息
    timeout  > 0 - 延迟重新入队 timeout 毫秒

注意:没有成功响应

错误响应:

E_INVALID
E_REQ_FAILED

TOUCH

重置在途消息的超时

注意:在 nsqd v0.2.17+ 中可用

TOUCH <message_id>\n

<message_id> - 消息的十六进制 ID

注意:没有成功响应

错误响应:

E_INVALID
E_TOUCH_FAILED

CLS

干净地关闭您的连接(不再发送消息)

CLS\n

成功响应:

CLOSE_WAIT

错误响应:

E_INVALID

NOP

空操作

NOP\n

注意:没有响应

AUTH

注意:在 nsqd v0.2.29+ 中可用

如果 IDENTIFY 响应指示 auth_required=true,则客户端必须在任何 SUBPUBMPUB 命令之前发送 AUTH。如果 auth_required 不存在(或为 false),客户端不得 授权。

nsqd 接收到 AUTH 命令时,它将责任委托给配置的 --auth-http-address,通过执行 HTTP 请求,其中客户端元数据以查询参数的形式提供:连接的远程地址、TLS 状态和提供的认证密钥。请参阅 AUTH 以获取更多细节。

AUTH\n
[ 4-byte size in bytes ][ N-byte Auth Secret ]

成功响应:

一个 JSON 负载,描述授权客户端的身份、可选 URL 和授权权限的数量。

{"identity":"...", "identity_url":"...", "permission_count":1}

错误响应:

E_AUTH_FAILED - 联系认证服务器时发生错误
E_UNAUTHORIZED - 未找到权限

数据格式

数据异步流式传输到客户端,并进行帧格式化以支持各种回复主体,例如:

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size     frame type     data

客户端应期望以下帧类型之一:

FrameTypeResponse int32 = 0
FrameTypeError    int32 = 1
FrameTypeMessage  int32 = 2

最后,消息格式:

[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
|       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
|       8-byte         ||    ||                 16-byte                      || N-byte
------------------------------------------------------------------------------------------...
  nanosecond timestamp    ^^                   message ID                       message body
                       (uint16)
                        2-byte
                       attempts