有赞延迟队列设计

延迟队列,顾名思义它是一种带有延迟功能的消息队列。
那么,是在什么场景下我才需要这样的队列呢?

#背景
我们先看看以下业务场景:

  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?
  • 新创建店铺,N 天内没有上传商品,系统如何知道该信息,并发送激活短信?等等

为了解决以上问题,最简单直接的办法就是定时去扫表。每个业务都要维护一个自己的扫表逻辑。 当业务越来越多时,我们会发现扫表部分的逻辑会非常类似。我们可以考虑将这部分逻辑从具体的业务逻辑里面抽出来,变成一个公共的部分。
那么开源界是否已有现成的方案呢?答案是肯定的。Beanstalkd(http://kr.github.io/beanstalkd/), 它基本上已经满足以上需求。但是,在删除消息的时候不是特别方便,需要更多的成本。而且,它是基于 C 语言开发的,当时我们团队主流是 PHP 和 Java,没法做二次开发。于是我们借鉴了它的设计思路,用 Java 重新实现了一个延迟队列。

#设计目标

  • 消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。
  • Client 支持丰富:由于业务上的需求,至少支持 PHP 和 Python。
  • 高可用性:至少得支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。
  • 实时性:允许存在一定的时间误差。
  • 支持消息删除:业务使用方,可以随时删除指定消息。

#整体结构
整个延迟队列由 4 个部分组成:

  • Job Pool 用来存放所有 Job 的元信息。
  • Delay Bucket 是一组以时间为维度的有序队列,用来存放所有需要延迟的/已经被 reserve 的 Job(这里只存放 Job Id)。
  • Timer 负责实时扫描各个 Bucket,并将 delay 时间大于等于当前时间的 Job 放入到对应的 Ready Queue。
  • Ready Queue 存放处于 Ready 状态的 Job(这里只存放 Job Id),以供消费程序消费。

如下图表述:
Delay Queue
#设计要点
## 基本概念

  • Job:需要异步处理的任务,是延迟队列里的基本单元。与具体的 Topic 关联在一起。
  • Topic:一组相同类型 Job 的集合(队列)。供消费者来订阅。

## 消息结构
每个 Job 必须包含一下几个属性:

  • Topic:Job 类型。可以理解成具体的业务名称。
  • Id:Job 的唯一标识。用来检索和删除指定的 Job 信息。
  • Delay:Job 需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
  • TTR(time-to-run):Job 执行超时时间。单位:秒。
  • Body:Job 的内容,供消费者做具体的业务处理,以 json 格式存储。

具体结构如下图表示:
Job Struct
TTR 的设计目的是为了保证消息传输的可靠性。

## 消息状态转换
每个 Job 只会处于某一个状态下:

  • ready:可执行状态,等待消费。
  • delay:不可执行状态,等待时钟周期。
  • reserved:已被消费者读取,但还未得到消费者的响应(delete、finish)。
  • deleted:已被消费完成或者已被删除。

下面是四个状态的转换示意图:
Job State Flow
## 消息存储
在选择存储介质之前,先来确定下具体的数据结构:

  • Job Poll 存放的 Job 元信息,只需要 K/V 形式的结构即可。key 为 job id,value 为 job struct。
  • Delay Bucket 是一个有序队列。
  • Ready Queue 是一个普通 list 或者队列都行。

能够同时满足以上需求的,非 redis 莫属了。
bucket 的数据结构就是 redis 的 zset,将其分为多个 bucket 是为了提高扫描速度,降低消息延迟。

## 通信协议
为了满足多语言 Client 的支持,我们选择 Http 通信方式,通过文本协议(json)来实现与 Client 端的交互。
目前支持以下协议:

  • 添加:{‘command’:’add’, ’topic’:’xxx’, ‘id’: ‘xxx’, ‘delay’: 30, ’TTR’: 60, ‘body’:‘xxx’}
  • 获取:{‘command’:’pop’, ’topic’:’xxx’}
  • 完成:{‘command’:’finish’, ‘id’:’xxx’}
  • 删除:{‘command’:’delete’, ‘id’:’xxx’}

body 也是一个 json 串。
Response 结构:{’success’:true/false, ‘error’:’error reason’, ‘id’:’xxx’, ‘value’:’job body’}
强调一下:job id 是由业务使用方决定的,一定要保证全局唯一性。这里建议采用 topic+业务唯一 id 的组合。

## 举例说明一个 Job 的生命周期

  • 用户对某个商品下单,系统创建订单成功,同时往延迟队列里 put 一个 job。job 结构为:{‘topic’:‘order_close’, ‘id’:‘order_close_order_NoXXX’, ‘delay’:1800 ,’TTR’:60 , ‘body’:’XXXXXXX’}
  • 延迟队列收到该 job 后,先往 job pool 中存入 job 信息,然后根据 delay 计算出绝对执行时间,并以轮询 (round-robbin) 的方式将 job id 放入某个 bucket。
  • timer 每时每刻都在轮询各个 bucket,当 1800 秒(30 分钟)过后,检查到上面的 job 的执行时间到了,取得 job id 从 job pool 中获取元信息。如果这时该 job 处于 deleted 状态,则 pass,继续做轮询;如果 job 处于非 deleted 状态,首先再次确认元信息中 delay 是否大于等于当前时间,如果满足则根据 topic 将 job id 放入对应的 ready queue,然后从 bucket 中移除;如果不满足则重新计算 delay 时间,再次放入 bucket,并将之前的 job id 从 bucket 中移除。
  • 消费端轮询对应的 topic 的 ready queue(这里仍然要判断该 job 的合理性),获取 job 后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的 job 按照其设定的 TTR,重新计算执行时间,并将其放入 bucket。
  • 消费端处理完业务后向服务端响应 finish,服务端根据 job id 删除对应的元信息。

#现有物理拓扑
deploy
目前采用的是集中存储机制,在多实例部署时 Timer 程序可能会并发执行,导致 job 被重复放入 ready queue。为了解决这个问题,我们使用了 redis 的 setnx 命令实现了简单的分布式锁,以保证每个 bucket 每次只有一个 timer thread 来扫描。

#设计不足的地方
timer 是通过独立线程的无限循环来实现,在没有 ready job 的时候会对 CPU 造成一定的浪费。
消费端在 reserve job 的时候,采用的是 http 短轮询的方式,且每次只能取的一个 job。如果 ready job 较多的时候会加大网络 I/O 的消耗。
数据存储使用的 redis,消息在持久化上受限于 redis 的特性。
scale-out 的时候依赖第三方(nginx)。

#未来架构方向
基于 wait/notify 方式的 Timer 实现。
提供 TCP 长连的 API,实现 push 或者 long-polling 的消息 reserve 方法。
拥有自己的存储方案(内嵌数据库、自定义数据结构写文件),确保消息的持久化。
实现自己的 name-server。
考虑提供周期性任务的直接支持。