内部实现

NSQ 由 3 个守护进程组成:

  • nsqd 是接收、队列化和向客户端分发消息的守护进程。

  • nsqlookupd 是管理拓扑信息并提供最终一致性发现服务的守护进程。

  • nsqadmin 是一个 Web UI,用于实时检查集群(并执行各种管理任务)。

NSQ 中的数据流被建模为 streamsconsumers 的树形结构。topic 是一个独特的数据流。channel 是订阅给定 topic 的消费者的逻辑分组。

主题/通道

单个 nsqd 可以有多个 topic,每个 topic 可以有多个 channel。channel 接收 topic 的所有消息的 copy,从而实现 multicast 风格的分发,同时 channel 上的每条消息在 其订阅者之间 distributed,实现负载均衡。

这些原语形成了一个强大的框架,用于表达各种 简单和复杂 拓扑

有关 NSQ 设计的更多信息,请参阅 设计文档

主题和通道

主题和通道是 NSQ 的核心原语,最好地体现了系统设计如何无缝转换为 Go 的特性。

Go 的通道(以下简称“go-chan”以便区分)是表达队列的自然方式,因此 NSQ 的 topic/channel 核心就是一个 buffered go-chan,其中包含 Message 指针。 缓冲区的大小等于 --mem-queue-size 配置参数。

从网络读取数据后,向 topic 发布消息的操作涉及:

  1. 实例化 Message 结构体(并分配消息体 []byte
  2. read-lock 获取 Topic
  3. read-lock 检查发布能力
  4. 在 buffered go-chan 上发送

要将消息从 topic 传递到其 channel,topic 不能依赖典型的 go-chan 接收语义,因为多个 goroutine 在 go-chan 上接收会 distribute 消息, 而期望的最终结果是将每条消息 copy 到每个 channel(goroutine)。

相反,每个 topic 维护 3 个主要 goroutine。第一个称为 router,负责 从传入的 go-chan 读取新发布的消息并将它们存储到队列(内存或磁盘)中。

第二个称为 messagePump,负责如上所述复制并推送消息到 channel。

第三个负责 DiskQueue IO,将在后面讨论。

Channel 稍微复杂一些,但共享暴露 single 输入和 single 输出 go-chan 的底层目标(以抽象内部消息可能在 内存或磁盘的事实):

queue goroutine

此外,每个 channel 维护 2 个时间有序的优先级队列,负责延迟和 飞行中消息超时(以及 2 个伴随的 goroutine 用于监控它们)。

通过管理 per-channel 数据结构来改进并行化,而不是依赖 Go 运行时的 global 定时器调度器。

注意: 内部,Go 运行时使用单个优先级队列和 goroutine 来管理定时器。 这支持(但不限于)整个 time 包。它通常消除了 user-land 时间有序优先级队列的需求,但重要的是要记住它是一个 single 数据结构,具有 single 锁,可能影响 GOMAXPROCS > 1 的性能。 请参阅 runtime/time.go。(在 go-1.10+ 中不再成立)

后端 / DiskQueue

NSQ 的一个设计目标是限制保存在内存中的消息数量。它通过 透明地将消息溢出写入磁盘来实现此目标,使用 DiskQueue(它拥有 topic 或 channel 的 third 主要 goroutine)。

由于内存队列只是一个 go-chan,将消息路由到内存(如果 可能)然后回退到磁盘是 trivial 的:

for msg := range c.incomingMsgChan {
	select {
	case c.memoryMsgChan <- msg:
	default:
		err := WriteMessageToBackend(&msgBuf, msg, c.backend)
		if err != nil {
			// ... handle errors ...
		}
	}
}

利用 Go 的 select 语句,可以用几行代码表达此功能:上面的 default 情况仅在 memoryMsgChan 满时执行。

NSQ 还有 ephemeral topic/channel 的概念。它们 discard 消息溢出 (而不是写入磁盘),并且当它们不再有订阅客户端时消失。这是 Go 接口的完美用例。Topic 和 channel 有一个声明为 Backend interface 的结构体成员,而不是具体类型。普通 topic 和 channel 使用 DiskQueue,而 ephemeral ones 则使用 DummyBackendQueue,它实现了一个 no-op Backend

减少 GC 压力

在任何垃圾回收环境中,你都会面临吞吐量(做有用工作)、延迟(响应性)和驻留集大小(内存占用) 之间的张力。

从 Go 1.2 开始,GC 是标记-清除(并行)、非分代、非压缩、stop-the-world 并且大多精确的。它 mostly 精确是因为剩余工作没有及时完成 (计划在 Go 1.3 中完成)。

Go GC 肯定会继续改进,但普遍真理是:创建的垃圾越少,收集的时间越少

首先,重要的是了解 GC 在 real workloads 下的行为。为此, nsqdstatsd 格式发布 GC 统计信息(连同其他内部指标)。 nsqadmin 显示这些指标的图表,让你洞察 GC 在频率和持续时间上的影响:

single node view

为了实际 reduce 垃圾,你需要知道它在哪里生成。Go 工具链再次提供答案:

  1. 使用 testing 包和 go test -benchmem 来基准测试热代码路径。它 分析每次迭代的分配数量(并且基准测试运行可以使用 benchcmp 进行比较)。
  2. 使用 go build -gcflags -m 构建,它输出 escape analysis 的结果。

考虑到这一点,以下优化对 nsqd 很有用:

  1. 避免 []bytestring 的转换。
  2. 重用缓冲区或对象(并且将来可能使用 sync.Poolissue 4720)。
  3. 预分配切片(在 make 中指定容量),并始终知道 网络上的项目数量和大小。
  4. 对各种可配置拨盘应用合理的限制(例如消息大小)。
  5. 避免装箱(interface{} 的使用)或不必要的包装类型(例如 用于“multiple value” go-chan 的 struct)。
  6. 在热代码路径中避免使用 defer(它会分配)。

TCP 协议

NSQ TCP 协议 是这些 GC 优化概念被充分利用的一个闪耀示例 部分。

协议使用长度前缀帧结构,使得编码和解码直截了当且高效:

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size      frame ID     data

由于帧组件的确切类型和大小是预先已知的,我们可以避免 encoding/binary 包的便利 Read()Write() 包装器(及其多余的接口查找和转换),而 是直接调用适当的 binary.BigEndian 方法。

为了减少 socket IO 系统调用,客户端 net.Conn 被包装为 bufio.Readerbufio.WriterReader 暴露 ReadSlice(),它重用其 内部缓冲区。这几乎消除了从 socket 读取时的分配,大大减少 GC 压力。这是因为大多数命令相关的数据不会转义(在 边缘情况下不是这样时,数据被 explicitly 复制)。

在更低级别,MessageID 被声明为 [16]byte,以便将其用作 map 键(切片不能用作 map 键)。然而,由于从 socket 读取的数据存储为 []byte,而不是通过分配 string 键产生垃圾,并且为了避免从 切片到 MessageID 的 backing array 的复制,使用 unsafe 包直接将切片 转换为 MessageID

id := *(*nsq.MessageID)(unsafe.Pointer(&msgID))

注意: 这是一个 hack。如果编译器优化了它,就不需要了, 并且 Issue 3512 是打开的,可能解决这个问题。还值得阅读 issue 5376,它讨论了“const like” byte 类型的可能性, 它可以与 string 互换使用,without 分配和复制。

类似地,Go 标准库仅在 string 上提供数字转换方法。为了 避免 string 分配,nsqd 使用 自定义 base 10 转换方法 直接操作 []byte

这些可能看起来像微优化,但 TCP 协议包含一些 hottest 代码 路径。总体上,以每秒数万条消息的速率,它们对分配数量和开销有 显著影响:

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Data           3575         1963  -45.09%

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Sub256        57964        14568  -74.87%
BenchmarkProtocolV2Sub512        58212        16193  -72.18%
BenchmarkProtocolV2Sub1k         58549        19490  -66.71%
BenchmarkProtocolV2Sub2k         63430        27840  -56.11%

benchmark                   old allocs   new allocs    delta
BenchmarkProtocolV2Sub256           56           39  -30.36%
BenchmarkProtocolV2Sub512           56           39  -30.36%
BenchmarkProtocolV2Sub1k            56           39  -30.36%
BenchmarkProtocolV2Sub2k            58           42  -27.59%

HTTP

NSQ 的 HTTP API 建立在 Go 的 net/http 包之上。因为它 just 是 HTTP,它 可以在几乎任何现代编程环境中利用,而无需特殊的客户端库。

它的简单性掩盖了它的强大,因为 Go 的 HTTP 工具箱中最有趣的方面之一 是它支持的广泛调试功能。net/http/pprof 包直接与原生 HTTP 服务器集成,暴露端点以检索 CPU、堆、 goroutine 和 OS 线程配置文件。这些可以直接从 go 工具针对:

$ go tool pprof http://127.0.0.1:4151/debug/pprof/profile

这对于调试和分析 running 进程非常有价值!

此外,/stats 端点返回大量指标,以 JSON 或 pretty-printed text 格式, 使管理员可以从命令行实时检查:

$ watch -n 0.5 'curl -s http://127.0.0.1:4151/stats | grep -v connected'

这会产生连续输出,如:

[page_views     ] depth: 0     be-depth: 0     msgs: 105525994 e2e%: 6.6s, 6.2s, 6.2s
    [page_view_counter        ] depth: 0     be-depth: 0     inflt: 432  def: 0    re-q: 34684 timeout: 34038 msgs: 105525994 e2e%: 5.1s, 5.1s, 4.6s
    [realtime_score           ] depth: 1828  be-depth: 0     inflt: 1368 def: 0    re-q: 25188 timeout: 11336 msgs: 105525994 e2e%: 9.0s, 9.0s, 7.8s
    [variants_writer          ] depth: 0     be-depth: 0     inflt: 592  def: 0    re-q: 37068 timeout: 37068 msgs: 105525994 e2e%: 8.2s, 8.2s, 8.2s

[poll_requests  ] depth: 0     be-depth: 0     msgs: 11485060 e2e%: 167.5ms, 167.5ms, 138.1ms
    [social_data_collector    ] depth: 0     be-depth: 0     inflt: 2    def: 3    re-q: 7568  timeout: 402   msgs: 11485060 e2e%: 186.6ms, 186.6ms, 138.1ms

[social_data    ] depth: 0     be-depth: 0     msgs: 60145188 e2e%: 199.0s, 199.0s, 199.0s
    [events_writer            ] depth: 0     be-depth: 0     inflt: 226  def: 0    re-q: 32584 timeout: 30542 msgs: 60145188 e2e%: 6.7s, 6.7s, 6.7s
    [social_delta_counter     ] depth: 17328 be-depth: 7327  inflt: 179  def: 1    re-q: 155843 timeout: 11514 msgs: 60145188 e2e%: 234.1s, 234.1s, 231.8s

[time_on_site_ticks] depth: 0     be-depth: 0     msgs: 35717814 e2e%: 0.0ns, 0.0ns, 0.0ns
    [tail821042#ephemeral     ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 33909699 e2e%: 0.0ns, 0.0ns, 0.0ns

最后,每个新的 Go 版本通常会带来 measurable performance gains。当针对最新版本的 Go 重新编译时,提供免费提升总是很棒!

依赖项

来自其他生态系统,Go 的哲学(或缺乏哲学)在管理依赖项方面需要一些时间来适应。

NSQ 从一个巨大的单一仓库演变而来,具有 relative imports 和内部包之间几乎没有分离, 到完全拥抱推荐的最佳实践,涉及结构和依赖管理。

有两种主要的思维流派:

  1. Vendoring:将依赖项以正确的修订版复制到你的应用程序仓库 中,并修改你的导入路径以引用本地副本。
  2. Virtual Env:列出你所需的依赖项修订版,并在构建时生成一个 干净的 GOPATH 环境,其中包含那些固定的依赖项。

注意: 这仅适用于 binary 包,因为对于可导入的 包,让它做出中间决策来决定使用哪个版本的依赖项没有意义。

NSQ 使用上面的方法 (2)。(它首先使用 gpm,然后 dep,现在 使用 Go modules)。

测试

Go 提供了坚实的内置支持,用于编写测试和基准测试,并且,因为 Go 使 建模并发操作如此容易,在你的测试环境中启动一个完整的 nsqd 实例是 trivial 的。

然而,初始实现的一个方面成为测试的问题: 全局状态。最明显的罪魁祸首是使用全局变量来保存运行时 nsqd 实例的引用,即 var nsqd *NSQd

某些测试会无意中在它们的本地作用域中掩盖这个全局变量,通过使用 短形式变量赋值,即 nsqd := NewNSQd(...)。这意味着全局引用 没有指向当前运行的实例,从而破坏测试。

为了解决这个问题,传递一个 Context 结构体,其中包含配置元数据和 对父 nsqd 的引用。所有对全局状态的引用都被替换为这个本地 Context,允许子级(topic、channel、协议处理程序等)安全地访问此数据 并使测试更可靠。

鲁棒性

一个在面对变化的网络条件或意外事件时不鲁棒的系统是一个 在分布式生产环境中性能不佳的系统。

NSQ 被设计和实现成允许系统容忍故障并以 一致、可预测且不出人意料的方式行为。

总体哲学是快速失败,将错误视为致命,并提供一种手段来调试任何 发生的问题。

但是,为了 react 你需要能够 detect 异常条件……

心跳和超时

NSQ TCP 协议是 push oriented 的。在连接、握手和订阅后,消费者 被置于 RDY 状态 0。当消费者准备好接收消息时,它将该 RDY 状态更新为它愿意接受的消息数量。NSQ 客户端库在幕后持续 管理这个,导致一个流控的消息流。

定期,nsqd 将在连接上发送心跳。客户端可以配置 心跳之间的间隔,但 nsqd 期望在发送下一个之前收到响应。

应用级心跳和 RDY 状态的组合避免了 head-of-line blocking,否则会使心跳无用(即如果消费者 在处理消息流落后,OS 的接收缓冲区将填满,阻塞心跳)。

为了保证进度,所有网络 IO 都绑定了相对于配置的心跳 间隔的截止日期。这意味着你可以字面上拔掉 nsqd 和消费者之间的网络连接,它将检测并正确处理错误。

当检测到致命错误时,客户端连接被强制关闭。飞行中消息被 超时并重新队列以分发到另一个消费者。最后,错误被记录并各种 内部指标被递增。

管理 Goroutines

启动 goroutines 令人惊讶地容易。不幸的是,协调 它们的清理并不那么容易。避免死锁也具有挑战性。通常这归结为一个排序 问题,其中在 go-chan 上接收的 goroutine 在上游 goroutine 发送之前退出。

为什么在意呢?很简单,一个孤立的 goroutine 是一个 memory leak。在长 运行守护进程中的内存泄漏很糟糕,尤其是当期望你的进程在一切失败时稳定。

为了进一步复杂化,典型的 nsqd 进程有 many goroutines 参与消息 交付。内部,消息“ownership” 经常变化。为了能够干净地关闭,它 极其重要来考虑所有 intraprocess 消息。

虽然没有万能药,但以下技术使管理更容易一些……

WaitGroups

sync 包提供了 sync.WaitGroup,它可以用于 执行活动 goroutines 的会计(并提供一种等待它们退出的手段)。

为了减少典型的样板代码,nsqd 使用这个包装器:

type WaitGroupWrapper struct {
	sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
	w.Add(1)
	go func() {
		cb()
		w.Done()
	}()
}

// can be used as follows:
wg := WaitGroupWrapper{}
wg.Wrap(func() { n.idPump() })
...
wg.Wait()

退出信号

在多个子 goroutine 中触发事件的最简单方式是提供一个单一的 go-chan 当准备好时关闭它。所有在该 go-chan 上的待处理接收将激活,而不是必须 向每个 goroutine 发送单独的信号。

func work() {
    exitChan := make(chan int)
    go task1(exitChan)
    go task2(exitChan)
    time.Sleep(5 * time.Second)
    close(exitChan)
}
func task1(exitChan chan int) {
    <-exitChan
    log.Printf("task1 exiting")
}

func task2(exitChan chan int) {
    <-exitChan
    log.Printf("task2 exiting")
}

同步退出

实现一个可靠的、无死锁的退出路径来考虑所有 飞行中消息相当困难。几点提示:

  1. 理想情况下,负责在 go-chan 上发送的 goroutine 也应该负责 关闭它。

  2. 如果消息不能丢失,确保相关的 go-chans 被清空(尤其是 unbuffered ones!)以保证发送者可以进步。

  3. 或者,如果消息不再相关,对单一 go-chan 的发送应该 转换为带有退出信号的 select(如上所述)以保证 进步。

  4. 一般顺序应该是:

    1. 停止接受新连接(关闭 listeners)
    2. 向子 goroutines 信号退出(见上文)
    3. WaitGroup 上等待 goroutine 退出(见上文)
    4. 恢复缓冲数据
    5. 将剩余内容刷新到磁盘

日志记录

最后,你拥有的最重要的工具是 记录你的 goroutines 的入口和退出!。在死锁或泄漏的情况下,它使识别罪魁祸首 infinitely 更容易。

nsqd 日志行包含将 goroutines 与其兄弟(和父级)相关联的信息, 例如客户端的远程地址或 topic/channel 名称。

日志是 verbose 的,但不会 verbose 到日志 overwhelming 的程度。有一条细线,但 nsqd 倾向于在故障发生时在日志中提供 more 信息,而不是试图减少 chattiness 以牺牲有用性。