Druid 在有赞的实践

## 一、Druid 介绍
Druid 是 MetaMarket 公司研发,专为海量数据集上的做高性能 OLAP (OnLine Analysis Processing) 而设计的数据存储和分析系统,目前Druid 已经在 Apache 基金会下孵化。Druid 的主要特性:

  • 交互式查询 (Interactive Query): Druid 的低延迟数据摄取架构允许事件在它们创建后毫秒内查询,因为 Druid 的查询延时通过只读取和扫描有必要的元素被优化。Druid 是列式存储,查询时读取必要的数据,查询的响应是亚秒级响应。
  • 高可用性 (High Available):Druid 使用 HDFS/S3 作为 Deep Storage,Segment 会在 2 个 Historical 节点上进行加载;摄取数据时也可以多副本摄取,保证数据可用性和容错性。
  • 可伸缩 (Horizontal Scalable):Druid 部署架构都可以水平扩展,增加大量服务器来加快数据摄取,以及保证亚秒级的查询服务
  • 并行处理 (Parallel Processing): Druid 可以在整个集群中并行处理查询
  • 丰富的查询能力 (Rich Query):Druid 支持 Scan、 TopN、 GroupBy、 Approximate 等查询,同时提供了 2 种查询方式:API 和 SQL

Druid 常见应用的领域:

  • 网页点击流分析
  • 网络流量分析
  • 监控系统、APM
  • 数据运营和营销
  • BI 分析 /OLAP

## 二、为什么我们需要用 Druid
有赞作为一家 SaaS 公司,有很多的业务的场景和非常大量的实时数据和离线数据。在没有是使用 Druid 之前,一些 OLAP 场景的场景分析,开发的同学都是使用 SparkStreaming 或者 Storm 做的。用这类方案会除了需要写实时任务之外,还需要为了查询精心设计存储。带来问题是:开发的周期长;初期的存储设计很难满足需求的迭代发展;不可扩展。
在使用 Druid 之后,开发人员只需要填写一个数据摄取的配置,指定维度和指标,就可以完成数据的摄入;从上面描述的 Druid 特性中我们知道,Druid 支持 SQL,应用 APP 可以像使用普通 JDBC 一样来查询数据。通过有赞自研 OLAP 平台的帮助,数据的摄取配置变得更加简单方便,一个实时任务创建仅仅需要 10 来分钟,大大的提高了开发效率。

##2.1、Druid 在有赞使用场景

  • 系统监控和 APM:有赞的监控系统 (天网) 和大量的 APM 系统都使用了 Druid 做数据分析
  • 数据产品和 BI 分析:有赞 SaaS 服务为商家提供了有很多数据产品,例如:商家营销工具,各类 BI 报表
  • 实时 OLAP 服务:Druid 为风控、数据产品等 C 端业务提供了实时 OLAP 服务

## 三、Druid 的架构

Druid 的架构是 Lambda 架构,分成实时层 (Overlord、 MiddleManager) 和批处理层(Broker 和 Historical)。主要的节点包括(PS: Druid 的所有功能都在同一个软件包中,通过不同的命令启动):

  • Coordinator 节点:负责集群 Segment 的管理和发布,并确保 Segment 在 Historical 集群中的负载均衡
  • Overlord 节点:Overlord 负责接受任务、协调任务的分配、创建任务锁以及收集、返回任务运行状态给客户端;在 Coordinator 节点配置 asOverlord,让 Coordinator 具备 Overlord 功能,这样减少了一个组件的部署和运维
  • MiddleManager 节点:负责接收 Overlord 分配的索引任务,创建新启动 Peon 实例来执行索引任务,一个 MiddleManager 可以运行多个 Peon 实例
  • Broker 节点:负责从客户端接收查询请求,并将查询请求转发给 Historical 节点和 MiddleManager 节点。Broker 节点需要感知 Segment 信息在集群上的分布
  • Historical 节点:负责按照规则加载非实时窗口的 Segment
  • Router 节点:可选节点,在 Broker 集群之上的 API 网关,有了 Router 节点 Broker 不在是单点服务了,提高了并发查询的能力

## 四、有赞 OLAP 平台的架构和功能解析

4.1 有赞 OLAP 平台的主要目标:

  • 最大程度的降低实时任务开发成本:从开发实时任务需要写实时任务、设计存储,到只需填写配置即可完成实时任务的创建
  • 提供数据补偿服务,保证数据的安全:解决因为实时窗口关闭,迟到数据的丢失问题
  • 提供稳定可靠的监控服务:OLAP 平台为每一个 DataSource 提供了从数据摄入、Segment 落盘,到数据查询的全方位的监控服务

4.2 有赞 OLAP 平台架构
OLAP平台架构
有赞 OLAP 平台是用来管理 Druid 和周围组件管理系统,OLAP 平台主要的功能:

  • Datasource 管理
  • Tranquility 配置和实例管理:OLAP 平台可以通过配置管理各个机器上 Tranquility 实例,扩容和缩容
  • 数据补偿管理:为了解决数据迟延的问题,OLAP 平台可以手动触发和自动触发补偿任务
  • Druid SQL 查询: 为了帮助开发的同学调试 SQL,OLAP 平台集成了 SQL 查询功能
  • 监控报警

4.2 Tranquility 实例管理

OLAP 平台采用的数据摄取方式是 Tranquility 工具,根据流量大小对每个 DataSource 分配不同 Tranquility 实例数量; DataSource 的配置会被推送到 Agent-Master 上,Agent-Master 会收集每台服务器的资源使用情况,选择资源丰富的机器启动 Tranquility 实例,目前只要考虑服务器的内存资源。同时 OLAP 平台还支持 Tranquility 实例的启停,扩容和缩容等功能。
OLAP平台数据摄取架构

4.3 解决数据迟延问题——离线数据补偿功能

流式数据处理框架都会有时间窗口,迟于窗口期到达的数据会被丢弃。如何保证迟到的数据能被构建到 Segment 中,又避免实时任务窗口长期不能关闭。我们研发了 Druid 数据补偿功能,通过 OLAP 平台配置流式 ETL 将原始的数据存储在 HDFS 上,基于 Flume 的流式 ETL 可以保证按照 Event 的时间,同一小时的数据都在同一个文件路径下。再通过 OLAP 平台手动或者自动触发 Hadoop-Batch 任务,从离线构建 Segment。
OLAP数据补偿服务
基于 Flume 的 ETL 采用了 HDFS Sink 同步数据,实现了 Timestamp 的 Interceptor,按照 Event 的时间戳字段来创建文件 (每小时创建一个文件夹),延迟的数据能正确归档到相应小时的文件中。

4.4 冷热数据分离

随着接入的业务增加和长期的运行时间,数据规模也越来越大。Historical 节点加载了大量 Segment 数据,观察发现大部分查询都集中在最近几天,换句话说最近几天的热数据很容易被查询到,因此数据冷热分离对提高查询效率很重要。Druid 提供了 Historical 的 Tier 分组机制与数据加载 Rule 机制,通过配置能很好的将数据进行冷热分离。
首先将 Historical 群进行分组,默认的分组是 "_default_tier",规划少量的 Historical 节点,使用 SATA 盘;把大量的 Historical 节点规划到 “hot” 分组,使用 SSD 盘。然后为每个 DataSource 配置加载 Rule :

  • rule1: 加载 1 份最近 30 天的 Segment 到 “hot” 分组;
  • rule2: 加载 2 份最近 6 个月的 Segment 到 “_default_tier” 分组;
  • rule3: Drop 掉之前的所有 Segment(注:Drop 只影响 Historical 加载 Segment,Drop 掉的 Segment 在 HDFS 上仍有备份)
{"type":"loadByPeriod","tieredReplicants":{"hot":1}, "period":"P30D"} 
{"type":"loadByPeriod","tieredReplicants":{"_default_tier":2}, "period":"P6M"} 
{"type":"dropForever"}

提高 "hot" 分组集群的 druid.server.priority 值(默认是 0),热数据的查询都会落到 “hot” 分组。

Druid数据冷热分离

4.5 监控与报警

Druid 架构中的各个组件都有很好的容错性,单点故障时集群依然能对外提供服务:Coordinator 和 Overlord 有 HA 保障;Segment 是多副本存储在 HDFS/S3 上;同时 Historical 加载的 Segment 和 Peon 节点摄取的实时部分数据可以设置多副本提供服务。同时为了能在节点 / 集群进入不良状态或者达到容量极限时,尽快的发出报警信息。和其他的大数据框架一样,我们也对 Druid 做了详细的监控和报警项,分成了 2 个级别:

  • 基础监控
    包括各个组件的服务监控、集群水位和状态监控、机器信息监控
  • 业务监控
    业务监控包括:实时任务创建、数据摄取 TPS、消费迟延、持久化相关、查询 RT/QPS 等的关键指标,有单个 DataSource 和全局的 2 种不同视图;同时这些监控项都有设置报警项,超过阈值触发报警提醒。业务指标的采集是大部分是通过 Druid 框架自身提供的Metrics 和 Alerts 信息,然后流入到 Kafka / OpenTSDB 等组件,通过流数据分析获得我们想要的指标。

4.6 部署架构

部署架构

Historical 集群的部署和 4.4 节中描述的数据冷热分离相对应,用 SSD 集群存储最近的 N 天的热数据(可调节 Load 的天数),用相对廉价的 Sata 机型存储更长时间的历史冷数据,同时充分利用 Sata 的 IO 能力,把 Segment Load 到不同磁盘上;在有赞有很多的收费业务,我们在硬件层面做隔离,保证这些业务在查询端有足够的资源;在接入层,使用 Router 做路由,避免了 Broker 单点问题,也能很大的程度集群查询吞吐量;在 MiddleManager 集群,除了部署有 Index 任务(内存型任务)外,我们还混合部署了部分流量高 Tranquility 任务(CPU 型任务),提高了 MiddleManager 集群的资源利用率。

4.7 贡献开源社区

在有赞业务查询方式一般是 SQL On Broker/Router,我们发现一旦有少量慢查询的情况,客户端会出现查询不响应的情况,而且连接越来越难获取到。登录到 Broker 的服务端后发现,可用连接数量急剧减少至被耗尽,同时出现了大量的 TCP Close_Wait。用 jstack 工具排查之后发现有 deadlock 的情况,具体的 Stack 请查看 ISSUE-6867

经过源码排查之后发现,DruidConnection 为每个 Statement 注册了回调。在正常的情况下 Statement 结束之后,执行回调函数从 DruidConnection 的 statements 中 remove 掉自己的状态;如果有慢查询的情况(超过最长连接时间或者来自客户端的 Kill),connection 会被强制关闭,同时关闭其下的所有 statements ,2 个线程(关闭 connection 的线程和正在退出 statement 的线程)各自拥有一把锁,等待对方释放锁,就会产生死锁现象,连接就会被马上耗尽。

// statement线程退出时执行的回调函数
final DruidStatement statement = new DruidStatement(
	connectionId,
	statementId,
	ImmutableSortedMap.copyOf(sanitizedContext),
	() -> {
	    // onClose function for the statement
	    synchronized (statements) {
	       log.debug("Connection[%s] closed statement[%s].", connectionId, statementId);
	       statements.remove(statementId);
	   }
	}
);
// 超过最长连接时间的自动kill
return connection.sync(
    exec.schedule(
        () -> {
          log.debug("Connection[%s] timed out.", connectionId);
          closeConnection(new ConnectionHandle(connectionId));
        },
        new Interval(DateTimes.nowUtc(), config.getConnectionIdleTimeout()).toDurationMillis(),
        TimeUnit.MILLISECONDS
    )
);

在排查清楚问题之后,我们也向社区提了 PR-6868 。目前已经成功合并到 Master 分支中,将会 0.14.0 版本中发布。如果读者们也遇到这个问题,可以直接把该 PR cherry-pick 到自己的分支中进行修复。

## 五、挑战和未来的展望
###5.1 数据摄取系统

目前比较常用的数据摄取方案是:KafkaIndex 和 Tranquility 。我们采用的是 Tranquility 的方案,目前 Tranquility 支持了 Kafka 和 Http 方式摄取数据,摄取方式并不丰富;Tranquility 也是 MetaMarket 公司开源的项目,更新速度比较缓慢,不少功能缺失,最关键的是监控功能缺失,我们不能监控到实例的运行状态,摄取速率、积压、丢失等信息。
目前我们对 Tranquility 的实例管理支持启停,扩容缩容等操作,实现的方式和 Druid 的 MiddleManager 管理 Peon 节点是一样的。把 Tranquility 或者自研摄取工具转换成 Yarn 应用或者 Docker 应用,就能把资源调度和实例管理交给更可靠的调度器来做。

###5.2 Druid 的维表 JOIN 查询

Druid 目前并不没有支持 JOIN 查询,所有的聚合查询都被限制在单 DataSource 内进行。但是实际的使用场景中,我们经常需要几个 DataSource 做 JOIN 查询才能得到所需的结果。这是我们面临的难题,也是 Druid 开发团队遇到的难题。

###5.3 整点查询 RT 毛刺问题
对于 C 端的 OLAP 查询场景,RT 要求比较高。由于 Druid 会在整点创建当前小时的 Index 任务,如果查询正好落到新建的 Index 任务上,查询的毛刺很大,如下图所示:

我们已经进行了一些优化和调整,首先调整 warmingPeriod 参数,整点前启动 Druid 的 Index 任务;对于一些 TPS 低,但是 QPS 很高的 DataSource ,调大 SegmentGranularity,大部分 Query 都是查询最近 24 小时的数据,保证查询的数据都在内存中,减少新建 Index 任务的,查询毛刺有了很大的改善。尽管如此,离我们想要的目标还是一定的差距,接下去我们去优化一下源码。

###5.4 历史数据自动 Rull-Up
现在大部分 DataSource 的 Segment 粒度 (SegmentGranularity) 都是小时级的,存储在 HDFS 上就是每小时一个 Segment。当需要查询时间跨度比较大的时候,会导致 Query 很慢,占用大量的 Historical 资源,甚至出现 Broker OOM 的情况。如果创建一个 Hadoop-Batch 任务,把一周前 (举例) 的数据按照天粒度 Rull-Up 并且 重新构建 Index,应该会在压缩存储和提升查询性能方面有很好的效果。关于历史数据 Rull-Up 我们已经处于实践阶段了,之后会专门博文来介绍。

最后打个小广告,有赞大数据团队基础设施团队,主要负责有赞的数据平台 (DP), 实时计算 (Storm, Spark Streaming, Flink),离线计算 (HDFS, YARN, HIVE, SPARK SQL),在线存储(HBase),实时 OLAP (Druid) 等数个技术产品,欢迎感兴趣的小伙伴联系 zhaojiandong@youzan.com