NSQ 协议足够简单,在任何语言中构建客户端都应该是轻而易举的。我们 提供了官方的 Go 和 Python 客户端库。
一个 nsqd 进程监听一个可配置的 TCP 端口,接受客户端连接。
连接后,客户端必须发送一个 4 字节的“magic”标识符,表示他们将通信的协议版本(升级变得简单)。
V2 (4 字节 ASCII [space][space][V][2])
一种基于推送的流式协议,用于消费(以及用于发布的请求/响应)认证后,客户端可以选择发送一个 IDENTIFY 命令来提供自定义元数据(例如,更具描述性的标识符)并协商功能。为了开始消费消息,客户端必须订阅到一个通道。
订阅后,客户端被置于 RDY 状态 0。这意味着不会向客户端发送任何消息。当客户端准备好接收消息时,它应该发送一个命令来更新其 RDY 状态到它准备处理的某个数字,比如 100。没有其他命令的情况下,将向客户端推送 100 条可用消息(每次递减服务器端的该客户端 RDY 计数)。
V2 协议还支持客户端心跳。每 30 秒(默认但可配置),nsqd
将发送一个 _heartbeat_ 响应并期望返回一个命令。如果客户端空闲,发送
NOP。在 2 个未应答的 _heartbeat_ 响应后,nsqd 将超时并强制关闭未收到响应的客户端
连接。IDENTIFY 命令可用于更改/禁用此行为。
除非另有说明,所有 线上的二进制大小/整数均为 网络字节序 (即 大 端)
有效的 topic 和 channel 名称是字符 [.a-zA-Z0-9_-] 且 1 <= length <= 64
(在 nsqd 0.2.28 之前最大长度为 32)
在服务器上更新客户端元数据并协商功能
IDENTIFY\n
[ 4-byte size in bytes ][ N-byte JSON data ]
注意:此命令接受一个带大小前缀的 JSON 主体,相关字段:
client_id 一个用于区分此客户端的标识符(即,与消费者特定的内容)
hostname 客户端部署的主机名
feature_negotiation (nsqd v0.2.19+) 布尔值,用于指示客户端支持功能协商。如果服务器支持,它将返回一个支持的功能和元数据的 JSON 负载。
heartbeat_interval (nsqd v0.2.19+) 心跳之间的毫秒数。
有效范围:1000 <= heartbeat_interval <= configured_max (-1 禁用心跳)
--max-heartbeat-interval (nsqd 标志) 控制最大值
默认值为 --client-timeout / 2
output_buffer_size (nsqd v0.2.21+) nsqd 在写入此客户端时使用的缓冲区大小(字节)。
有效范围:64 <= output_buffer_size <= configured_max (-1 禁用输出缓冲)
--max-output-buffer-size (nsqd 标志) 控制最大值
默认值为 16kb
output_buffer_timeout (nsqd v0.2.21+) nsqd 已缓冲的任何数据将被刷新到此客户端的超时时间。
有效范围:1ms <= output_buffer_timeout <= configured_max (-1 禁用超时)
--max-output-buffer-timeout (nsqd 标志) 控制最大值
默认值为 250ms
警告:将客户端配置为极低的(< 25ms)output_buffer_timeout
会显著影响 nsqd 的 CPU 使用率(特别是连接 > 50 个客户端时)。
这是由于当前实现依赖于 Go 定时器,这些定时器由 Go 运行时维护在优先级队列中。请参阅 提交消息 中的 拉取请求 #236 以获取更多细节。
tls_v1 (nsqd v0.2.22+) 为此连接启用 TLS。
--tls-cert 和 --tls-key (nsqd 标志) 启用 TLS 并配置服务器证书
如果服务器支持 TLS,它将回复 "tls_v1": true
客户端应在读取 IDENTIFY 响应后立即开始 TLS 握手
服务器将在完成 TLS 握手后响应 OK
snappy (nsqd v0.2.23+) 为此连接启用 snappy 压缩。
--snappy (nsqd 标志) 在服务器端启用对此的支持
客户端应在 IDENTIFY 响应后立即期望一个 额外的、snappy 压缩的 OK 响应。
客户端不能同时启用 snappy 和 deflate。
deflate (nsqd v0.2.23+) 为此连接启用 deflate 压缩。
--deflate (nsqd 标志) 在服务器端启用对此的支持
客户端应在 IDENTIFY 响应后立即期望一个 额外的、deflate 压缩的 OK 响应。
客户端不能同时启用 snappy 和 deflate。
deflate_level (nsqd v0.2.23+) 为此连接配置 deflate 压缩级别。
--max-deflate-level (nsqd 标志) 配置允许的最大值
有效范围:1 <= deflate_level <= configured_max
更高的值意味着更好的压缩,但 nsqd 的 CPU 使用率更高。
sample_rate (nsqd v0.2.25+) 将接收到的所有消息的百分比传递到此连接。
有效范围:0 <= sample_rate <= 99 (0 禁用采样)
默认值为 0
user_agent (nsqd v0.2.25+) 一个字符串,用于识别此客户端的代理,类似于 HTTP
默认值:<client_library_name>/<version>
msg_timeout (nsqd v0.2.28+) 为传递到此客户端的消息配置服务器端消息超时(毫秒)。
成功响应:
OK
注意:如果客户端发送了 feature_negotiation(并且服务器支持它),则
响应将是一个如上所述的 JSON 负载。
错误响应:
E_INVALID
E_BAD_BODY
订阅一个 topic/channel
SUB <topic_name> <channel_name>\n
<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)
<channel_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)
成功响应:
OK
错误响应:
E_INVALID
E_BAD_TOPIC
E_BAD_CHANNEL
向 topic 发布一条消息:
PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]
<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)
成功响应:
OK
错误响应:
E_INVALID
E_BAD_TOPIC
E_BAD_MESSAGE
E_PUB_FAILED
向 topic 发布多条消息(原子方式):
注意:在 nsqd v0.2.16+ 中可用
MPUB <topic_name>\n
[ 4-byte body size ]
[ 4-byte num messages ]
[ 4-byte message #1 size ][ N-byte binary data ]
... (重复 <num_messages> 次)
<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)
成功响应:
OK
错误响应:
E_INVALID
E_BAD_TOPIC
E_BAD_BODY
E_BAD_MESSAGE
E_MPUB_FAILED
向 topic 发布一条延迟消息:
注意:在 nsqd v0.3.6+ 中可用
DPUB <topic_name> <defer_time>\n
[ 4-byte size in bytes ][ N-byte binary data ]
<topic_name> - 一个有效的字符串(可选带有 #ephemeral 后缀)
<defer_time> - 整数 D 的字符串表示,定义延迟时间,其中 0 <= D < max-requeue-timeout
成功响应:
OK
错误响应:
E_INVALID
E_BAD_TOPIC
E_BAD_MESSAGE
E_DPUB_FAILED
更新 RDY 状态(表示您准备好接收 N 条消息)
注意:从 nsqd v0.2.20+ 开始,使用 --max-rdy-count 来限制此值
RDY <count>\n
<count> - 整数 N 的字符串表示,其中 0 < N <= configured_max
注意:没有成功响应
错误响应:
E_INVALID
完成一条消息(表示 成功 处理)
FIN <message_id>\n
<message_id> - 消息 ID 为 16 字节十六进制字符串
注意:没有成功响应
错误响应:
E_INVALID
E_FIN_FAILED
重新入队一条消息(表示 处理失败)
重新入队的消息被放置在队列尾部,相当于刚刚发布它, 但由于各种实现特定原因,不应明确依赖该行为,并且未来可能会更改。
类似地,在途消息超时的情况与显式 REQ 的行为相同。
REQ <message_id> <timeout>\n
<message_id> - 消息 ID 为 16 字节十六进制字符串
<timeout> - 整数 N 的字符串表示,其中 N <= 配置的最大超时
timeout == 0 - 立即重新入队消息
timeout > 0 - 延迟重新入队 timeout 毫秒
注意:没有成功响应
错误响应:
E_INVALID
E_REQ_FAILED
重置在途消息的超时
注意:在 nsqd v0.2.17+ 中可用
TOUCH <message_id>\n
<message_id> - 消息的十六进制 ID
注意:没有成功响应
错误响应:
E_INVALID
E_TOUCH_FAILED
干净地关闭您的连接(不再发送消息)
CLS\n
成功响应:
CLOSE_WAIT
错误响应:
E_INVALID
空操作
NOP\n
注意:没有响应
注意:在 nsqd v0.2.29+ 中可用
如果 IDENTIFY 响应指示 auth_required=true,则客户端必须在任何
SUB、PUB 或 MPUB 命令之前发送 AUTH。如果 auth_required 不存在(或为 false),客户端不得
授权。
当 nsqd 接收到 AUTH 命令时,它将责任委托给配置的
--auth-http-address,通过执行 HTTP 请求,其中客户端元数据以查询参数的形式提供:连接的远程地址、TLS 状态和提供的认证密钥。请参阅
AUTH 以获取更多细节。
AUTH\n
[ 4-byte size in bytes ][ N-byte Auth Secret ]
成功响应:
一个 JSON 负载,描述授权客户端的身份、可选 URL 和授权权限的数量。
{"identity":"...", "identity_url":"...", "permission_count":1}
错误响应:
E_AUTH_FAILED - 联系认证服务器时发生错误
E_UNAUTHORIZED - 未找到权限
数据异步流式传输到客户端,并进行帧格式化以支持各种回复主体,例如:
[x][x][x][x][x][x][x][x][x][x][x][x]...
| (int32) || (int32) || (binary)
| 4-byte || 4-byte || N-byte
------------------------------------...
size frame type data
客户端应期望以下帧类型之一:
FrameTypeResponse int32 = 0
FrameTypeError int32 = 1
FrameTypeMessage int32 = 2
最后,消息格式:
[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
| (int64) || || (hex string encoded in ASCII) || (binary)
| 8-byte || || 16-byte || N-byte
------------------------------------------------------------------------------------------...
nanosecond timestamp ^^ message ID message body
(uint16)
2-byte
attempts