How we redesigned the NSQ - NSQ 重塑之详细设计
之前的 文章 讲述了我们重塑 NSQ 的目的和目标, 接下来我们将详细描述下每个功能的具体技术细节.
重构后架构图
首先, 看一下重构后的整体架构图:
原来的几个 NSQ 组件大部分功能是复用的, 图中新增的就是元数据存储服务 -etcd, 以及数据同步和 HA 处理逻辑.
改造 topic queue
为了增加副本和其他特性, 首先需要改造的就是 nsq 的 topic 数据写入方式, 要保证数据最终落盘, 才能继续后面的改造. 所以我们第一步重构数据写入逻辑, 这块逻辑本身并不依赖分布式功能, 可以独立重构.
数据落盘
原版的 topic 写入流程是通过 golang 里面的 chan 来做的, 只有超过 chan 容量之后, 才会落盘. 但是 chan 有几个缺点, 首先是内存数据, 数据会丢, 其次是只能两端操作, 无法做更多丰富的查询操作. 因此 chan 本身作为数据存储方案对于持久化数据是不太合适的. 改造这块的逻辑还是比较简单的, 只要把原来写 chan 满之后的数据才落盘的逻辑直接改成任何数据都落盘即可.
但是这样修改之后, 第一是由于 IO 操作比内存耗时, 导致写入速度不理想; 第二是需要处理 channel 和 topic 的数据关系问题, 每个 channel 的消费数据是独立的, 原来处理方式是把所有数据复制一份到每个 channel, 如果还是按这种方式处理, 会导致所有 channel 的数据需要再次操作数据写入磁盘, 无疑会带来更大的性能损失. 为了避免落盘改造带来的性能下降, 我们做了更多工作来优化写入性能.
消费 channel 的数据同步处理
以前的方式之所以要从 topic 复制所有消息到每个 channel, 是因为使用的是 golang 里面的 chan 这种机制, 只有复制才能保证每个消费组的数据互相独立. 当所有数据落盘之后, 我们其实不需要再做这种数据复制的操作了, 只需要记录每个 channel 已经同步的数据位移和每个 channel 的消费位移即可. 这样所有的 channel 引用的是同一份 topic 磁盘数据, 每个 channel 维护自己的独立位移信息即可. 节省了数据复制的操作, 提高了性能, 也保证了各个 channel 之间的独立性. 从流程上看, 改造后我们把 topic 的写入和读取流程分离开了, topic 本身只负责写入数据, channel 仅负责读取数据.
组提交和刷盘调度优化
topic 到 channel 之间的数据复制步骤去掉之后, 已经带来了较大的性能提升. 接着我们继续优化 topic 本身的写入优化.
首先, 在服务端引入常见的 Group commit 组提交方式, 将多个消息一次性提交, 减少 IO 操作. 这样不管是因为刷盘还是因为数据副本同步的延迟, 都会把这段时间的积累的多个消息作为一组一次性写入. 这样就大大减少了需要操作的写入次数. 这种服务端 group commit 的方式往往比 Kafka 客户端批量写入的方式会表现更好, 因为实际场景下, 往往单个客户端的写入并没有那么大, 而是分散到非常多的客户端机器上面了. 而服务端 group commit 处理可以更好的优化这种场景. 具体代码如下:
// client async pub
// 客户端pub的数据写入临时chan后, 在chan上面等待结果异步返回
func internalPubAsync(clientTimer *time.Timer, msgBody *bytes.Buffer, topic *nsqd.Topic) error {
...
info := &nsqd.PubInfo{
Done: make(chan struct{}),
MsgBody: msgBody,
StartPub: time.Now(),
}
if clientTimer == nil {
clientTimer = time.NewTimer(time.Second * 5)
} else {
clientTimer.Reset(time.Second * 5)
}
select {
case topic.GetWaitChan() <- info:
default:
select {
case topic.GetWaitChan() <- info:
case <-topic.QuitChan():
nsqd.NsqLogger().Infof("topic %v put messages failed at exiting", topic.GetFullName())
return nsqd.ErrExiting
case <-clientTimer.C:
nsqd.NsqLogger().Infof("topic %v put messages timeout ", topic.GetFullName())
return ErrPubToWaitTimeout
}
}
<-info.Done
return info.Err
}
// group commit loop
// 组提交循环, 在chan上面等待异步提交写入请求, 并尝试一次提交所有等待中的请求,
// 请求完成后, 通过关闭对应请求的chan来通知客户端结果.
func (c *context) internalPubLoop(topic *nsqd.Topic) {
messages := make([]*nsqd.Message, 0, 100)
pubInfoList := make([]*nsqd.PubInfo, 0, 100)
topicName := topic.GetTopicName()
partition := topic.GetTopicPart()
defer func() {
done := false
for !done {
select {
case info := <-topic.GetWaitChan():
pubInfoList = append(pubInfoList, info)
default:
done = true
}
}
for _, info := range pubInfoList {
info.Err = nsqd.ErrExiting
close(info.Done)
}
}()
quitChan := topic.QuitChan()
infoChan := topic.GetWaitChan()
for {
select {
case <-quitChan:
return
case info := <-infoChan:
if info.MsgBody.Len() <= 0 {
nsqd.NsqLogger().Logf("empty msg body")
}
messages = append(messages, nsqd.NewMessage(0, info.MsgBody.Bytes()))
pubInfoList = append(pubInfoList, info)
default:
if len(pubInfoList) == 0 {
select {
case <-quitChan:
return
case info := <-infoChan:
messages = append(messages, nsqd.NewMessage(0, info.MsgBody.Bytes()))
pubInfoList = append(pubInfoList, info)
}
continue
}
var retErr error
if c.checkForMasterWrite(topicName, partition) {
_, _, _, err := c.PutMessages(topic, messages)
if err != nil {
nsqd.NsqLogger().LogErrorf("topic %v put messages %v failed: %v", topic.GetFullName(), len(messages), err)
retErr = err
}
} else {
topic.DisableForSlave()
nsqd.NsqLogger().LogDebugf("should put to master: %v",
topic.GetFullName())
retErr = consistence.ErrNotTopicLeader.ToErrorType()
}
for _, info := range pubInfoList {
info.Err = retErr
close(info.Done)
}
pubInfoList = pubInfoList[:0]
messages = messages[:0]
}
}
}
其次, 针对不同的 topic 可以支持不同的刷盘策略, 灵活适配不同业务的刷盘需求. 在创建 topic 的时候, 可以指定每写入多少条消息刷盘一次, 这样每个业务由于写入的频率不同, 把刷盘的请求打散了, 避免一次性过多的刷盘请求阻塞正常写入. 另外, 还有一个后台 goroutine 会在每隔固定时间选择一部分 topic 分区强制刷盘, 保证数据及时落盘.
改造支持数据副本和 HA
topic 写入落盘改造之后, 再来看看如何增加数据副本和 HA 特性. 整体流程可以参考架构图, 做了类似 Kafka 的设计, 将每个 topic 的数据节点副本元信息写入 etcd, 然后通过 etcd 选举出每个 topic 的 leader 节点. 选举的 topic 的 leader 节点负责自己 topic 的数据副本同步, 其他 follower 节点从 leader 节点同步 topic 数据.
元数据存储
一些少量的元数据存储在 etcd, 保证整个集群的元数据的一致性. 具体包括每个 topic 的配置信息, 副本节点的分布, 选举出来的 leader 节点信息, nsqd 数据节点的配置信息以及 nsqlookupd 数据查询节点的配置信息. 元数据树结构图如下:
由于元数据数据量很少, 变更也非常少, 因此本身对 etcd 的性能并没有什么要求. 另外 nsqd 数据节点和 nsqlookupd 查询节点的保活也通过 etcd 来做.
leader 选举和 HA
改造后的 nsq 架构, 每个 topic 需要一个 leader 节点负责处理读写请求和数据同步. 为了保证每个节点的负载趋于均衡, 我们通过 nsqlookupd 来选择合适的 topic 的 leader 节点, 并通知给所有副本进行 leader 确认. leader 节点会尝试从 etcd 获取 topic 对应的 leader 锁确认 leader 有效.
当某个节点失效时, 会触发 etcd 的 watch 事件, 从而触发 nsqlookupd 重新选择其他存活的节点作为 topic 的新 leader, 完成 leader 的快速切换. 客户端如果此时正在写入老的 leader 也会触发失败重试, 并获取最新的 leader 节点, 完成自动 HA.
选举具体流程如下:
数据副本同步和动态 ISR
每个 topic 选举出来的 leader 节点负责同步数据到所有副本. 为了支持副本节点的动态变化, 参考了 Kafka 的 ISR(In synced replica) 的设计. 和 Kafka 不同的是, 我们用 push 模式, 不是 pull 的模式, 来保证数据的同步复制, 避免数据同步不一致. 因此, 数据写入首先由 leader 节点发起, 并且同步到所有 ISR 副本节点成功后, 才返回给客户端. 如果同步 ISR 节点失败, 则尝试动态调整 ISR 并重试直到成功为止. 重试的过程中会检查 leader 的有效性, 以及是否重复提交等条件. 写入流程和 ISR 的动态调整流程如图所示:
通过动态 ISR 的调整, 可以保证失效的节点及时从 ISR 中清理掉, 从而减少失效节点对数据副本同步性能的影响.
数据同步落后的节点会从 ISR 移动到 Catchup 列表中, Catchup 列表中的节点会自动尝试追上 leader 的数据日志, 追上后会通知 leader 进行 ISR 节点加入验证的流程.
不管是 leader 同步复制数据到 ISR 节点列表, 还是 catchup 节点从 leader 拉取同步数据, 都是通过一个 commitlog 来维护本地数据的同步情况. commitlog 维护的是每次写请求的自增 id, 以及该写入请求对应于 topic 的磁盘队列文件的位置和数据大小, 有了 commitlog, 我们就可以判断每个副本的同步状态, 以及下次需要从哪里开始同步数据, 也可以判断写入请求是否是重复的已提交请求. 在 leader 选举时以及加入新的 ISR 节点时也会判断 commitlog 是否已经完全同步.
数据副本的配置支持 topic 级别的, 因此可以在一个集群里面对不同的 topic 配置不同的副本数, 来满足不同的业务需求, 另外 topic 级别的配置隔离也会更方便对后面更多的特性进行隔离. 使得不同的业务 topic 可以使用不同的特性配置.
自动数据平衡
由于所有的 topic 都是 leader 负责处理客户端的 pub 和 sub 请求, 因此为了保证各个节点的负载均衡, 我们实现了自动根据负载做数据平衡的功能. 通过自动平衡功能也可以实现在缩容或者扩容时, 自动的将数据迁移到其他机器上.
数据平衡是 nsqlookupd 通过定期收集各个 nsqd 数据节点的负载信息, 包括 CPU, 数据写入量, topic 的数量等信息, 给每个 nsqd 的负载算出一个 load 值, 然后使用动态 ISR 流程, 调整 topic 的 ISR 节点分布, 使得各个 nsqd 的节点的 load 的差距在合理范围内. 为了避免迁移影响正常服务, 可以配置允许数据平衡的时间范围.
分区支持
由于引入了 leader, 原来可以随便选择 nsqd 节点进行读写的方式需要选择 leader 进行读写, 这样会导致可以读写的节点变少, 为了提高读写的可扩展性, 对 topic 引入分区的概念, 每个 topic 可以指定多个分区, 每个分区使用独立的 leader 节点, 这样保证每个 topic 可以有多个可以读写的分区节点.
改造消费 channel
topic 改造之后, channel 本身不会存储消息数据了, 因此需要对原来某些消费特性做相应的处理, 同时也更方便引入新的消费特性.
处理消费 cursor
每个消费 channel 维护的 cursor 会包含当前已经消费确认的文件位置 (channel confirmed), 以及下一条消息读取的文件位置 (channel next read), channel 的下一条读取位置和已经消费确认位置之间的差值, 就是当前正在等待确认 ack 的消息.
每次投递消息给客户端时会从 channel next read 位置读取下一条消息到内存并更新 channel next read, 如果有多个客户端连接则会读取多条分别投递给多个客户端, 当客户端 ack 某条消息时, 会根据这条消息的位置信息判断是否需要移动已确认的消费位置. 如果该条消息的起点位置和 channel confirmed 位置重合时, 则更新 channel confimed 信息.
可以看到, 当消息乱序到来时, 已确认消费位置的游标 channel confirmed, 只能移动到目前连续的最小位置, 如果不是连续的, 则需要临时记录已经确认的所有非连续段的首尾位置作为一个 confirmed segment, 每次 ack 一条消息会判断是否可以和现有的 confirmed segment 合并成一个大的 segment. cursor 处理过程如下图所示:
正常情况下, 非连续的 confirmed segment 会异步复制到副本节点上, 当 leader 失效时, 副本节点也可以知道哪些非连续段是已经被确认的消息, 尽量减少重复投递. 不过异步同步如果异常, 并且节点重启, 内存中的 confirmed segment 会丢失, channel next read 会被重置到 channel confirmed 位置, 因此会出现部分的消息重复投递. 为了保证 at-least-once 的投递目的, 我们需要容忍某些异常情况下的重复消息.
处理重试和延时消费
可以看到, 改造后的 channel 可能会出现某些情况下待确认消息窗口过大的问题, 这样会导致内存中维护的 confirmed segment 过多, 内存可能占用过多的问题. 特别是当某些消息一直在重试时或者延时处理时, 会加剧这种情况. 为了避免这种多次重试或者延时消费的消息影响正常的数据消费, 我们为每个 channel 增加了一个异常延时队列, 保证这些消息和正常的消息消费进行一定程度的隔离.
正常情况下, 以及少量的重试时, 会一直在内存中维护这些待确认的消息对象, 一旦有多次异常的消息, 或者延时时间较长的消息时, 我们将此类消息自动从内存中移动到延时队列中, 然后继续投递后面正常的消息. 对于延时队列中的消息, 我们会定时的从队列中读出来进行重试. 由于延时队列是存储在 boltdb 的, 因此可以使用更少的内存处理大量的这种异常或者延时消息, 虽然 boltdb 的写入性能不佳, 但是我们这里只用来存储异常消息, 因此这种异常情况下的性能完全可以满足. 通过引入磁盘延时队列, 改造后的 channel 在不影响正常消息的情况下可以允许更多的重试以及更长的延时消息, 相比于之前延时 1 小时的上限, 可以增加延时上限到几天.
消费数据预读优化
为了更进一步的优化消费性能, 减少读 IO, 在 channel 读取消息进行投递的时候, 会自动读取更多的数据放在 buffer 里面, 下次直接读取 buffer 即可, 直到 buffer 数据读完之后, 再进行预读下一部分数据, 通过预读优化, 大大减少了磁盘随机读 IO, 也减少了读文件的系统调用次数, 也会顺便减少 golang 对于这种系统调用的切换开销.
重放历史数据
由于改造之后的所有 channel 都是使用 cursor 来引用实际的磁盘文件的, 因此重放历史数据进行消费这种特性就变得十分简单了. 只需将 cursor 重新定位到历史消息的位置即可. 而历史消息的位置可以借助 commitlog 进行搜索来定位. 可以支持按照时间或者按照消息队列的某个位置开始重放.
严格顺序消费
默认的消费模式, 是允许多个客户端链接同时消费 topic 下的同一个分区的不同消息的, 这样可以使用最小的分区数来达到较高的并发消费能力, 避免了像 Kafka 那样为了提高消费能力, 创建过多的分区, 也就避免了过多分区数带来的磁盘随机 IO 问题, 但是也由此带来乱序投递的问题. 实际场景中某些业务需要保证消息严格按照生产写入的顺序进行投递, 这种情况我们就需要在服务端控制投递的策略了.
对于业务的这种场景, 我们引入了顺序投递的特性. 并在生产方也支持按照业务定制 id 进行分区 hash 的生产能力, 从而保证从生产到消费整条链路是按照分区内有序的方式进行消息流转的. 消息生产方会根据业务的分区 id 将同样的 id 投递到同一个 topic 分区, 保证相同 id 的数据的顺序一致性, 而在投递时, 会调整并发消费策略, 保证同一时刻只会投递一条消息进行消费, 等待上一次 ack 成功后才继续投递下一条消息. 同时, 为了避免 leader 切换时影响消息的顺序性, ack 的消息还会同步到所有副本才算成功.
可以看到这种方式单个分区的消费并发能力没有乱序消费能力强, 主要取决于消费业务本身的处理能力, 为了提高更高的顺序消费并发能力, 需要更多的分区数. 分区数的计算可以根据每条消息的处理时间和每秒的消息数来计算: 分区数 = 每秒消息条数 * 每条消息的处理时间 (单位 s).
引入 Jepsen 分布式测试的支持
由于此次改造变动非常大, 引入了更多的复杂性, 为了确保我们的分布式改造满足预期的效果, 我们做了大量的异常测试, 当然也引入了业界流行的分布式测试工具 Jepsen. 在各种异常测试过程中也发现了一些普通情况下非常难以发现的问题. 通过 Jepsen 测试之后, 我们也对改造后的系统更有信心了, 即使后面做更多改造也可以更加放心了.
下面是 nsq 跑的一部分 jepsen 测试场景
(deftest nsq-test
(let [test (jepsen/run!
(assoc
noop-test
:name "nsq-simple-partition"
:os debian/os
:db db
:client (queue-client)
:nemesis (nemesis/partition-random-halves)
:model (model/unordered-queue)
:checker (checker/compose
{:total-queue checker/total-queue})
:generator (gen/phases
(->> (gen/queue)
(gen/delay 1/10)
(gen/nemesis
(gen/seq
(cycle [(gen/sleep 30)
{:type :info :f :start}
(gen/sleep 60)
{:type :info :f :stop}])))
(gen/time-limit 360))
(gen/nemesis
(gen/once {:type :info, :f :stop}))
(gen/log "waiting for recovery")
(gen/sleep 60)
(gen/clients
(gen/each
(gen/once {:type :invoke
:f :drain}))))))]
(is (:valid? (:results test)))
(report/to "report/queue.txt"
(-> test :results pprint))))
整个流程就是通过 jepsen 控制节点, 随机的断开集群里面的某些节点的网络, 持续一段时间然后恢复, 如此反复操作几次. 在这些断网过程中, 有多个客户端不停的进行 pub 和 sub 操作并记录所有的操作记录, 最后再对所有的消息进行对比, 判断是否有任何 pub 成功的消息没有被消费到, 以及重复的消息统计等. 通过对比结果就可以知道是否通过了 jepsen 测试的检验. 有了 jepsen 自动测试, 我们会在每次新版本发布前跑多次 jepsen 测试脚本, 确保没有引入新的分布式问题.
消息轨迹查询系统
之前的 NSQ 系统由于消息在 chan 里面流转一遍就没了, 很难事后进行数据追查, 也没有一个很好的方式去跟踪一条消息在从生产到被消费的各种中间状态. 为了满足业务方时不时的来排查业务消息状态的需求, 我们改造后的 NSQ 也支持动态的开启这种消息轨迹排查功能. 为了支持消息轨迹查询, 我们做了如下几个工作:
- 能通过消息 id 定位到磁盘上的消息内容
- 支持传入业务 traceid, 并将业务 traceid 和 nsq 的消息 id 关联起来
- 记录一条消息的所有中间态变化信息
- 将消息的状态变迁信息同步到 ES 搜索系统
- nsqadmin 提供方便的统一查询入口
- 支持针对 topic 和 channel 级别的动态跟踪开关以减少资源消耗
通过这一套轨迹查询系统, 在业务方需要排查问题的时候, 就能非常快速的找到异常消息的轨迹从而快速的定位业务问题了. 特别是在排查顺序消费的业务场景时, 经常能发现业务潜在的并发误用问题.
总结
改造的过程中还特别需要注意和老版本的客户端的兼容性, 以减少客户端的迁移改造成本. 除了以上几个大的改造点, 还有很多小的改造细节, 感兴趣的可以去研究下我们的开源代码 NSQ
接下来我们还会有一篇文章讲述我们正在添加的新功能以及接下来的计划, 包括各种 sink, connector, 消息过滤, 事务消息设计等等, 后面更多新的特性也会在将来不断的补充和完善起来, 敬请期待, 也欢迎有志之士踊跃参与 NSQ 的开源改进计划.