Kudu: Storage for Fast Analytics on Fast Data

Kudu 是一个声称「既能快速更新、又能快速查询」的分布式存储。它由 Cloudera 开发,作为 Hadoop 生态中存储的替代品,并在 2013 年开源。Cloudera 使用了 Impala + Kudu 的架构来作为一些替代( Impala 可以视作 Presto 类似的产品,支持在 HDFS/Hive 上做高效的查询,国内的 Apache Doris 也是早期从 Impala fork 过来的)。

Tips: Kudu 和 Impala 都是羊,Kudu 是 「捻角羚」,Impala 是小一些的黑斑羚。

Kudu 的模型以 Primary Key 为主,类似 StarRocks 的主键模型。之后我们也会介绍,不按 Primary Key 查询的时候,Kudu 的性能会光速拉胯。论文写于 2015 年,相对于 Presto 这种同样 2013 左右放出来,但是 2019 年才发表论文的系统而言,Kudu 很多地方显得没有那么详尽。很多地方翻一些工业界使用者的 blog 也能发现一些问题。不过 Kudu 使用好像相对少一些,国内看到网易应该有人用。

Kudu 使用的还是 shared-nothing 的处理方式,同时,Kudu 的文件写入之后也是不可更新的。Base + Delta 的方式被采用以处理这个模型。Kudu Delta 处理的方式相对来说有点类似 Position Delta Tree。阅读 Kudu 论文,我认为应该主要关注「Tablet 存储」的部分,其他部分有的不是很重要,有的我甚至认为非常草台。

Cloudera 当时希望的 workload 是:

  1. 需要有 streaming ingest 的数据管线,同时支持一些 updates
  2. 有一些定时作业来分析
  3. 高吞吐的存储
  4. low-latency random access

v2-86f0de0018e144b5d5b7335dc5002e75_r

Kudu 提供了对应的 api:

  1. Row-level inserts / updates / deletes
  2. Table Scan

Kudu 内部使用 Cfile 格式存储文件。这是 Kudu 给自己搞的适合自己的列存文件格式。

下面是论文的部分,在阅读的时候,我会尽量以论文为主轴,但是参考 Kudu 最新的实现。

Concepts

Table: 有定义好 Schema 的格式的表,一个 subset 的字段会被定义为 primary key。primary key 模型上必须是 unique 的,对 primary key 的删除被定义为快速的操作,否则可能会扫全表,因为它不支持 secondary index。

Schema 中,可以视作 <varchar: T> 的映射,Kudu 似乎不支持嵌套结构,如果有嵌套的结构,应该把这部分内容展开。关于 Kudu 的类型可以参考:https://kudu.apache.org/docs/schema_design.html#column-design

关于 DDL,Kudu 支持一些基本的 schema change,在用户层面需要执行 alter table,Kudu 会给每个 column 维护一个 id,同时 Primary Key 是不能变更的。这里 Kudu 的 Schema Change 系统感觉还是比较简陋的,感觉类型提升什么的支持都不是很好,nested 也不太支持,令人唏嘘。不过,同其他列存系统一样,Kudu 对列也会做一些 encoding,来提升对应的压缩率。

对于写来说,Kudu 前台可以用对 pk 的 Insert, Delete, Update 几种 API。论文成文的时候,系统尚未支持分布式事务,只能在单个 Tablet 内执行事务(可以支持单行事务)。目前 Kudu 已经支持分布式事务,不过支持的好不好另说。

Kudu 有一个 HLC 时间戳系统,参考了文章:

它的写有两种模式:

  1. CLIENT_PROPAGATED Consistency: 没啥同步,client 生成时间戳做事务
  2. COMMIT_WAIT : 会做同步,它声称自己做到了外部一致性。这是一个实验性功能,不鼓励在生产环境打开,同时,多 tablets 事务目前只支持 rc,然后单个 tablet 只支持一个事务。

而读也有不同的模式:

  1. READ_LATEST
  2. REAT_AT_SNAPSHOT
  3. READ_YOUR_WRITES

kudu 在读的时候支持 Scan / Projection / Filter。

Kudu 可以用 HLC 生成内部的时间,但也允许用户自己写时间然后给一个时间查询。

Architecture

BFA1827EA0FEB0E7FB3E455EB07D2502

23BFF20A-4E90-4062-8248-38F9C0A416CA

其实上面的架构有点类似 HBase,不同的是,这里还是走了 Raft 而不是 ZK 来管理集群。见:https://github.com/apache/kudu/tree/master/src/kudu/consensus 。走 Raft 管理感觉性能容易拉,但是相对来说能写的比较简单,而且依赖也少了。

当然,如果需要把 Kudu 暴露给 Impala 或者给外部查询,可能这里会需要 Hive Metastore 来暴露这些数据。

Master 节点运行的也是 tablet。和存储数据的节点跑着同一套代码,但建议还是跑在更好的服务器上。

对于 Tablet 服务器,之前有一些配置上的建议:

  1. 为了获得对大事实表的最优扫描性能,建议保持一个tablet对应一个CPU核的比例。不要将被复制的表计算在内,应该只考虑Leader tablet的数量。对于小维度的表,可以只分配几个tablet
  2. 建议每个 tablet 最多包含 2000个 tablets,论文中推荐每台机器 10-100 个 tablets,我估摸着可以按照 CPU 和 Disk 数量拍

需要注意的是,Kudu 的 WAL 是 Tablet 级别的(作为对比,写在 HDFS 上的 HBase 最初是一条 WAL 流,然后被小米扩展到多条)。相对而言,Kudu Tablet 间恢复可能比较独立,但是估计 IO 的 admission control 会相对复杂一些。

Partition

Kudu 的 Partition 属于 「没啥意思但是很实用」的类型,它首先会按照 Primary key 来分 partition 到不同 tablets 上。它提供了一种 hash - range 混合的 Partition 策略:

  1. Range partition
  2. hash partition
  3. multilevel partition

本质上,其实 Kudu 抽象比较简单,用户可以想象成:

  1. 走一到几组不同的 hash
  2. 然后对 hash 后的 binary 进行 range 分片
1
用户指定一个 fn, 把 pk 列 (pk1, pk2, ...) 映射到一个 binary

Range Partition

对于 range partition,例子如下:

range-partitioning-example

使用的时候,也可以参考 sample: https://docs.cloudera.com/cdp-private-cloud-base/7.1.6/impala-reference/topics/impala-kudu-partitioning.html#pnavId2

Kudu 也允许动态插入 Range Partition。

Hash Partition

Hash Partition 会把行分配到 bucket 中,如下图:

hash-partitioning-example

这里可以指定 PARTITION BY HASH(...) PARTITIONS 50 这样。

Multi

1DE6DE48-5A42-48A2-81FE-2443BB0367CA

这里,可以按照手动指定 range + hash 或者 hash + hash 做分片:

hash-range-partitioning-example

直观的感受还是,Tablet / Partition / Bucket 还是定的比较死的,在网易的博客里讨论过,当数据很少的时候,Kudu 也会按照给定的 Bucket 数量去分区。网易在他们的博客里提到了这个问题。注意,虽然不同 Partition 的 bucket-id 是相同的,但是它们存储在不同的 Tablet 上。

Hash Parition 方式类似静态分片,从代码可以看到:

这里针对 hash 设置了 range,然后对提前切分好的 hash range 来分片

Kudu 的 Parition 可以引入一些 Pruning: https://github.com/apache/kudu/blob/master/docs/design-docs/scan-optimization-partition-pruning.md . 这个 Pruning 类似不把 Task 发给被裁剪的 bucket。

Partition 还有一些小问题,对于 Kudu 的热点写,如果写的是某一个 Partition,那么 WAL 能比较好的处理。有一种 backfill 的场景,即 scan 数据回填,这种场景因为低缓存命中率,所以性能会比较差。

Replication

Kudu 走的是 shared-nothing + Raft 的模式,其实我不理解为啥走 Raft… Kudu Raft 实现应该比较简单,它的 WAL 走的是 Raft,各台机器的状态机允许不一致。

虽然表面说走了 Raft,但是 Kudu 实际上类似 ZooKeeper 的 A-linearizable,写的时候走 Raft,读的时候可以本地读。不知道它这个和事务怎么协同设计的(直观感受就是看这玩意不如花时间去折腾 MongoDB,因为它这几方面的设计都非常草台,让我们不要折磨自己,跳过它们吧)。

相对来说,Kudu 的 Compaction 是个本地的程序,不涉及全局的 Raft。似乎这个是各个机器自己处理的:

Kudu does not replicate the on-disk storage of a tablet, but rather just its operation log. The physical storage of each replica of a tablet is fully decoupled.

比较傻逼的是,在它的文档里,对 config change,它还是靠单步变更实现的…额我只能说他开心就行:https://github.com/apache/kudu/blob/master/docs/design-docs/raft-config-change.md

Kudu Master

之前说到了,Kudu 的 Tablet 由 Raft 来同步 WAL,因为 Raft 的存在,所以其实 Tablet 的分配是比较靠谱的,集群能够从这里面建立出信息,同时相对 HBase 来说,少了处理 RIT 的麻烦。

B195D594-DC70-42BC-B5DC-237A192FBBA2

这里 master 可以:

  1. 作为 tablet directory
  2. 存储元数据,作为一个 meta service,并处理 DDL 等操作
    1. 在创建表的时候,这里会异步的 pick TabletServer 去 Assign Tablet
    2. 在删除表和 DDL 的时候,这里会需要让子节点状态完成
    3. 类似 HBase,这里的状态推进是幂等的
  3. 监控 tablet server,适当的调度 tablets

在实现的时候,它类似 GFS Master,把目录信息全部缓存在内存中。然后不同于 Tablet,读在这里不可以走 Replica 去做不一致的读。

上面说到的部分我们可以进行稍微细致一些的描述,Kudu 的 Master 会维护成 sys.catalog 表。写、读严格遵照 Raft 的 syntax。这里维护了下列的信息:

  1. [Table Id] -> TableInfo
  2. [Table Name] -> TableInfo
  3. [Tablet Id] -> TabletInfo

而 TableInfo 则包含了 [tablet-start-key] -> TabletInfo 的有序 key-range mapping。

创建和删除表都被视作原子的多个 step,创建包含 preparing, running 等多个状态,这里会先推进 preparing,再推荐 writing,然后返回给用户,然后这里不会创建 tablets。直到 Table Assignment 阶段才会删除

对于删表,情况也差不多,这里会对表标记 “deleted”,然后对表发送的请求将失败。但是 Tablet 会走 DeleteTablet 请求异步的进行删除。

在 Table Assignment 阶段,这里会挑选机器,然后发送 CreateTablet 请求,并带有一个超时。CatalogManagerBgTasks 会监听信息,如果返回了对应的心跳,那么 Tablet 创建成功,否则需要挑选另一个 Tablet Server 了。

CatalogManagerBgTasks 是一个很重要的线程,它会维护心跳,然后根据心跳来:

  1. 把很久没发心跳的 tablet 切成 replaced 状态
  2. 给每创建的 tablet 挑选状态,启动 leader,然后启动它们
  3. 移动删掉了的 tablets

Tablet 的故障恢复依赖 Raft,除此之外,这里还有个类似 DNS 的服务,当拉起新的 master 的时候,需要让 leader 指向同一个 DNS CNAME。

Master 还负责当 Tablet Directory,client 的缓存有可能是旧的,这里没讲述有没有 epoch 来维护这些信息。只是说,发到的不是 tablet master 的话,会重新访问 Master。

Tablet Storage

tablet storage 是 Kudu 最重要的一部分,因为…这是有自己东西的一部分,你看前面的内容不是烂大街的吗(笑)(当然要做好也要一番功夫)。Tablet Storage 是 Kudu 比较独特的一部分,它可以被视为一个单机的引擎,因为感觉除了 WAL,不是很 Care 分布式的那堆逻辑。

看 Tablet Storage 的时候,需要注意这个模型还是有限制的,Primary Key 在其中占有非常重要的地位

Tablet Storage 是每个 Table 下的每个 Tablet 一份的。主要支持:

  1. Fast Columnar Scans: 这里实现了 CFile,自己的列存格式,然后引入了 bitshuffle 等 encoding 方式来做编码
  2. Random Updates: 对 primary key 能够快速更新,random access 希望有 $O(lg n)$ 的 complexity

E9F57C87-0442-42C1-8F8B-620615781030

类似 LSM Tree,Kudu 组织了 RowSets:

  1. MemRowSets: 在内存里的 RowSets
  2. DiskRowSets: 盘上的 RowSets

MemRowSets 采用了一个 MVCC 的 B-tree 实现,参考了 MassTree,具体实现于:https://github.com/apache/kudu/blob/master/src/kudu/tablet/concurrent_btree.h

其实这个实现有点类似 SkipList,基本上也是 append 作为写。然后需要注意的是,这里利用了 SIMD 和 JIT 加速(或许类似 ART ),然后 MemRowSets 是一个行存。

NOTE: other systems such as C-Store call the MemRowSet the “write optimized store” (WOS), and the on-disk files the “read-optimized store” (ROS).

DiskRowSet

CFile

DiskRowSets 由 CFile 组成,准确的说,在一个 DiskRowSet 内,每列都会有一个 CFile。CFile 是一个逻辑文件,Kudu 引入了一个 BlockManager 层,可能会把多个 CFile 映射到同一个物理文件上。

它由不同的部分组成,包括:

  1. Header
  2. Footer
  3. 不同种类的 Block: 类似 data blocks, nullable data blocks, index blocks, dictionary blocks

压缩和 encoding 以 Block (类似 Parquet 的 Page) 为粒度进行。每个 Block 可以选择自己的 encoding 方式。此外,这里还有对应的 Index:

这里用类似 Position Delta Tree 的方式组织 Index,有下面两种对应的 Index:

  1. Positional Index: 类似 PDT 或者 Parquet 中某列的 Offset Index
  2. 如果对应的数据有 sorted order,那会有这些数据的稀疏索引

数据会写到 「Leaf Block」上,Kudu 会维护逻辑的 Internal Block,然后最后按 Post-Order 写入:

D045E99C-2BA6-4E14-9129-0D90345BB981

例如下面就会写 [Int 2] [Int 1] [Int 0]

我从网上抄了个好心人画的图(Ref 5):

014448C5-6AAA-4654-9C92-1B94A9030255

其实和 Parquet 差不多,不过 Parquet 估摸着没 Btr,得线性或者二分

BloomFile

BloomFile 是独立的 CFile 文件,额外存了一份,作为需要的列的 BloomFilter。有了 BF,那么上面那图操作进行之前,还要查一下对应的 BF 了。

BF 会被处理到 4KB 的 Page 中,然后快速查询。

DeltaRowSet

DiskRowSet 由 MemRowSet flush 而来,类似 RocksDB,我们也能知道对应的 min-max 值,用以支持后续的 compaction。

作为 Primary Key 模型,不同于 RocksDB / LevelDB,虽然 DiskRowSet 可以跨范围,但是单个 Primary Key 只可能出现在一个文件中:

8a52d89f9e118ec3d5899cf0c7cfc560

Kudu 区分了几种更新操作:

  1. Update
  2. Delete
  3. REINSERT

Update 和 Delete 没有落在 MemRowSet,这点需要注意。然后 REINSERT 是和 Delete 对应的,如果有删除数据后来又重新插回去了,会纪录一条 REINSERT。这里保证了 BloomFilter 查到的东西相对靠谱,归属同一个 DiskRowSet。

这里更新记录 也是从 Mem 来的,如下:

48396E10-749F-4E6C-88EE-E79637ED0AC9

Delta MS 是内存的部分,这里会按照 (row_id, timestamp, RowChangeList) 维护:

  1. RowId 很特殊,类似 PDT 中的概念,是文件中对应的行号。注意,某一行被删除,这个 RowId 仍然留着
  2. timestamp 很好懂
  3. RowChangeList 是对这一行各个列操作的物理日志

Mem 的变更也是存放在同样的 btree 中,同时,这里可以根据 index 来找到对应的行号。这里可以 Flush 成对应格式的 Redo 文件,如下图:

5FECFBF7-8589-485D-B69A-4702C316B1BE

8F9B8DCC-3A58-434D-AE05-2C45C0CB5D7B

Compaction

Compaction 的时候,可能会有多个 RowSet 做 Compaction,也可以是单个文件做 Compaction。这里需要注意的是,为了满足 MVCC,Base + Redo 文件可能会变成 Base + Undo 文件

Redo 记录我们之前应该介绍过,下面简单讲讲 Undo:

D38B3E53-8A9D-4AA6-ABAC-1133EC3E799E

不同于 ARIES,一般 Kudu 会生成单向的 Redo 记录,然后当 Compaction 发生的时候,RDBMS,如 PG,可能会存一堆 Redo,然后找到一个最近版本恢复。在 Kudu 中,文件生命周期大概是:

  • Redo -> Base / Undo -> 被合并或者回收

这样避免了维护每个旧版本的文件视图和对它们的 ref(作为对比,RocksDB 的 VersionSet)

Compaction

Compaction 可以被分为 Major 和 Minor 的

Delta Compaction

63B946FB-CF2B-4E44-878C-8B735690EFC9

Minor REDO delta compaction: 合并 delta redo 成一个 redo,量级很轻,减小对应对 Delta 的 IO。实现逻辑就是原封不动的合并多个 Delta,应该能通过 IO 来提升压缩率,减少多文件 IO 和 fds

7D2628DB-2089-4B26-9465-29DBBC1A304C

Major Redo Delta Compaction 如上图,这个地方要重写 base 文件,然后把原本的 redo 变成 undo。同时,这个地方也不会改变 row id。这里有一个特殊的地方,就是可以单独处理频繁更新的列,剩下的仍然在 redo delta 中。

需要注意的是,compaction 完成之后,即可以移除之前的文件。

Range Compactions

924AD8CD-2483-4F80-B792-32BB4F45E41E

这个有点类似 RocksDB 的 Compaction,但它的操作其实相当重,我们可以讨论一下对应的问题:row_id 会改变,假设:

  1. 某个模型跑的好好的,需要 Compaction 了
  2. Compaction 进行的时候,写入还在不断进来
  3. Compaction 完成了,RowSet 的写基于了之前的 row_id
  4. 寄!

Kudu 采用了如下的实现策略:

77BE0595-D501-4CCA-80CC-15F14CED8FD0

这里,它加入了一个双写的 RowSet,然后Compaction 的时候,对应的变更会即写旧的又尝试写入新的,相当于把旧的写在新的 row_id 重放一遍。

机制

在 Kudu 中,Compaction 被建模为一个最优化模型,这里会根据 width 和 depth 来进行压缩:

231397D0-122C-491B-A7B3-809D440BFA07

如上图,这里会挑选合适的范围,然后去 compaction

具体的讨论见:https://github.com/apache/kudu/blob/master/docs/design-docs/compaction-policy.md

References

  1. Kudu paper
  2. Index Skip Scan Optimization: https://kudu.apache.org/2018/09/26/index-skip-scan-optimization-in-kudu.html
  3. Apache Kudu 在网易的实践 https://www.infoq.cn/article/kgwyqb5wer5wl8cquweq
  4. https://www.slideshare.net/cloudera/apache-kudu-technical-deep-dive
  5. Kudu CFile 解读:https://www.jianshu.com/p/789c286e7077