Tournament Trees

如之前 Sort 的文章 [1] 所述,Tournament Tree 核心是减少 Winner 流切换的比较次数。本文不对败者树做概论,而是看一下工程实现。DuckDB 靠 Merge Path 算法来实现 Sort,而 DataFusion 和 Velox 都实现了败者树的接口。ClickHouse 也实现了一个 Queue 相关的接口,但并没有使用 Loser Tree

ClickHouse 的实现

代码在 [2],算是写文章这天的 Master。这里的核心是:

  1. 实现是一个标准的 Heap,而不是我们的败者树。
  2. Element 被抽象为 Cursor,然后 Cursor 一些接口满足了 STL 的定义,可以利用 std::make_heap 等接口。这样也没有中间节点了。
  3. Cursor消费完的时候,从 Queue 中直接移除
  4. 按照标准的 Heap 算法调整堆
  5. 额外实现了一种 Batch 模式,在 batch 模式下,堆也会被 batch 处理
    1. 因为是 Heap,所以可以通过比较快速拿到第二大的流
    2. 定位到 最大的流 - 第二大的流 中的 batchSize
    3. 根据这个 BatchSize 来 Hint 用户的 nextBatch

具体的 next 靠 Cursor 来实现

ClickHouse 实现的亮点是 C++ 上很细的工程实现,比如这段代码:https://github.com/ClickHouse/ClickHouse/blob/6f88ada7a9b4aacaaa6c86e5341aecedeeb291f5/src/Core/SortCursor.h#L675 ,感觉这种特化肯定是比 Comparable Key 有前途的。不过这也是大家都可以做的优化。

Velox 的实现

Velox 实现了一个标准的 TreeOfLosers 接口,这套接口有下面的成员:

这里没有 next 的实现,其实是大家各显神通了,使用 MergeStream 其实也是 CRTP 的形式,令人唏嘘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/// Abstract class defining the interface for a stream of values to be merged by
/// TreeOfLosers or MergeArray. In addition to these, the MergeStream must have
/// a way of accessing its first value and popping off the first value.
/// TreeOfLosers and similar do not call these, so these are left out of this
/// interface.
class MergeStream {
public:
virtual ~MergeStream() = default;

/// True if this has a value. If this returns true, it is valid to call <. A
/// false value means that there will not be any more data in 'this'.
virtual bool hasData() const = 0;

/// Returns true if the first element of 'this' is less than the first element
/// of 'other'. hasData() must be true of 'this' and 'other'.
virtual bool operator<(const MergeStream& other) const {
return compare(other) < 0;
}

/// Returns < 0 if 'this' is < 'other, '0' if equal and > 0 otherwise. This is
/// not required for TreeOfLosers::next() but is required for
/// TreeOfLosers::nextWithEquals().
virtual int32_t compare(const MergeStream& /*other*/) const {
VELOX_UNSUPPORTED();
}
};

这里有两种实现:

  1. MergeArray: 第一次构建的时候 Sort 所有序列,然后每次 next() 之后,把第一个元素调整位置。有点像简陋版的堆
  2. TreeOfLosers: 败者树

这两套接口的特征是:

  • 都是以 Sorter 的形式提供的
  • next 返回 MergeStreamImpl*, 然后下次 next 的时候做内部调整再返回

比较有意思的是,和 ClickHouse 的 Batch 模式类似,Velox 的 TreeOfLosers 提供了两套接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/// Returns the stream with the lowest first element and a flag that is true
/// if there is another equal value to come from some other stream. The
/// streams should have ordered unique values when using this function. This
/// is useful for merging aggregate states that are unique by their key in
/// each stream. The caller is expected to pop off the first element of the
/// stream before calling this again. Returns {nullptr, false} when all
/// streams are at end.
///
/// 这里的 Equal 指的是 Equal value from other stream, 本 Stream 的 Equal 不受理.
std::pair<Stream*, bool> nextWithEquals();
/// Returns the stream with the lowest first element. The caller is expected
/// to pop off the first element of the stream before calling this again.
/// Returns nullptr when all streams are at end.
Stream* next();

内部实现逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class TreeOfLosers {
static constexpr TIndex kEmpty = std::numeric_limits<TIndex>::max();

const std::vector<std::unique_ptr<Stream>> streams_;

std::vector<TIndex> values_;
// 'true' if the corresponding element of 'values_' has met an equal
// element on its way to its present position. Used only in nextWithEquals().
// A byte vector is in this case faster than one of bool.
//
// 额外判定一个 Equal
std::vector<uint8_t> equals_;
// 最后一个数据的 Index, 初始化为 kEmpty
TIndex lastIndex_ = kEmpty;
// First Data Stream offset
int32_t firstStream_;
};

这里的结构其实是一个逻辑的树结构:

img

  1. streams_ 是下游的 inputs, 固定,即使 Stream 被消费完了,依旧存在于 LoserOfTrees 中 (但是 values_ 标记为 kEmpty )
  2. values_equals_ 是核心,这里的树结构如上图:
    1. 0 的位置在 Velox 的实现中并不存在,取而代之的是特殊成员 lastIndex_ , 指向 streams_,表示第一条流
    2. values_equals_ 的 size 是 firstStream_. 这句话有点抽象,举例来说,上图 Fig8 里面的 Inputs 是 StreamStream 不会出现在 values_ 中。如果 Stream 大小不是 2 的倍数,那么树的形状就会和下面 ascii 图一样. 在这里,1有两个成员,3,4 有两个成员,这是 6个 stream 的 TreeOfLosers. firstStream_ 在树中的逻辑位置是 5(虽然实际上 values_ 中没有这个成员); 同样的,如果有一个 stream 有 4条流,那么 firstStream_ 就是 3
    3. 流里面所有 values_被初始化为 kEmptylastIndex_ 也被初始化为 kEmpty
  3. 流比较的时候,如果是 Equal 系接口,调用 compare;否则直接拿 operator<
  4. 第一次调用的时候 (即 lastIndex_ = kEmpty 的时候),递归的 build 整个树的 equals 和 values
  5. 后续从 lastIndex_ 向上 propagate
  6. 边界处理:空流对外返回 kEmpty 做为 Index。values_中的内容也是 Index。当整棵树都为 kEmpty 的时候,树被消费完成,对外返回 nullptr 做为 next 的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
  0
/ \
1 2
/ \ / \
3 4 s0 s1 <-- 所有的 stream 都是树上的虚拟节点. 实际不存储在 values_ 中, 不过访问的时候逻辑上当这些节点存在
/\ /\
.....

0
/ \
1 2
/ \ / \
s0 s1 s2 s3

外部用例

GroupingSet 是一个这个的 user-case, 存放 Spill 的结果,这里会按照行来消费。我也不知道为啥没有 Batch 消费的实现。

Datafusion

https://github.com/apache/datafusion/blob/c21d025df463ce623f9193c4b24d86141fce81ca/datafusion/physical-plan/src/sorts/merge.rs

这里实现和 Velox 大致相同,目前 Datafusion 使用 SortPreservingMergeStream 来处理 Loser Tree

它的基础成员是 PartitionedStream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/// A [`Stream`](futures::Stream) that has multiple partitions that can
/// be polled separately but not concurrently
///
/// Used by sort preserving merge to decouple the cursor merging logic from
/// the source of the cursors, the intention being to allow preserving
/// any row encoding performed for intermediate sorts
pub trait PartitionedStream: std::fmt::Debug + Send {
type Output;

/// Returns the number of partitions
fn partitions(&self) -> usize;

fn poll_next(
&mut self,
cx: &mut Context<'_>,
stream_idx: usize,
) -> Poll<Option<Self::Output>>;
}

读者们可以看出这是个 Async 的玩意。它生成的成员被当作 CursorValues

1
2
/// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`]
type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>;

此外,这里还有 Cursor 的抽象,做为「单一的 Comparable 成员」,是 CursorValues 中某个 Index 的引用。

1
2
3
4
5
#[derive(Debug)]
pub struct Cursor<T: CursorValues> {
offset: usize,
values: T,
}

SortPreservingMergeStream 中,生成的目标会是 Batch,以 BatchBuilder 做为成员,在内部完成 BuildBatch,而 Velox 则是把 Stream* 传给外部,便于直接给 RowContainer 之类的 Build Row。

我们只关注这里面 Async 的部分,逻辑层和 Velox 几乎没有区别,我们就不赘述了

Async 的处理

1
2
3
4
5
6
7
8
#[derive(Debug)]
pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
/// This queue contains partition indices in order. When a partition is polled and returns `Poll::Ready`,
/// it is removed from the vector. If a partition returns `Poll::Pending`, it is moved to the end of the
/// vector to ensure the next iteration starts with a different partition, preventing the same partition
/// from being continuously polled.
uninitiated_partitions: VecDeque<usize>,
}

如上所述,PartitionedStream 是异步的。这里会记录 Stream 的 async 信息,然后放到 uninitiated_partitions 中,来做初始化队列。这部分只会在初始化的时候使用

在后续读取中,只有 Winner Cursor 会被消费,所以只有 Winner 需要被 check async。

Oceanbase

https://github.com/oceanbase/oceanbase/blob/fb38c3159e185cdb36c62eb5c03b4a112feae18c/src/storage/access/ob_scan_merge_loser_tree.h

https://github.com/oceanbase/oceanbase/blob/develop/deps/oblib/src/lib/container/ob_loser_tree.h#L54

OceanBase 的 Tree 看着像 winner-loser 混合,它的结构相比 Velox,记录的东西稍微多一些 ( is_draw 等价于 equal ):

这里同时记录了 winner 和 loser 和 replay。这里似乎是因为这个 LoserTree 是动态的,在 push 加入 Player 的时候,这部分能够快速调整 Loser Tree 的信息。当成员变更的时候,ob 得以快速 replay

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct MatchResult
{
public:
MatchResult() { reset(); };
~MatchResult() = default;
void reset()
{
winner_idx_ = INVALID_IDX;
loser_idx_ = INVALID_IDX;
is_draw_ = false;
replay_ = ReplayType::NO_CHANGE;
}
int64_t winner_idx_;
int64_t loser_idx_;
bool is_draw_;
ReplayType replay_;

TO_STRING_KV(K(winner_idx_), K(loser_idx_), K(is_draw_), K(replay_));

int set_replay_type(const int64_t player);
};

References

  1. https://blog.mwish.me/2024/09/27/Sorting-in-Database-System-Part1/
  2. https://github.com/ClickHouse/ClickHouse/blob/6f88ada7a9b4aacaaa6c86e5341aecedeeb291f5/src/Core/SortCursor.h#L581