Adaptive and Robust Query Execution for Lakehouses at Scale

本文发表在 vldb’22,介绍的是 Spark 的 AQE,这个功能22年左右就有了,论文提到在 DataBricks 这项功能已经 enable by default 了。对于 TP 查询而言,重要的是一个查询选中索引,同时,为了避免优化器不太准,也有一些 plan 能帮助用户选中对应的索引。对于分析而言,本身查询比较多样化,然后其实很多作业涉及的表会非常非常多,笔者在看到公司的一些查询的时候时常会感叹「诶我去这么复杂的查询也能跑出来」。在这种场景下,自动的调优就会比较有优势了——毕竟你不太可能真的在 2B 场景帮用户写 SQL。这其中更蛋疼的场景是,某些 SQL 是一些 BI 工具生成的,你想改?哼,没门。

本文比较有意思的是介绍的比较工程,一个是和 Spark 本身的 Shuffle / Stage 特性结合的比较好,一些 MPP 引擎会在一开始编译好查询,并且 MPP 风格的启动所有 Stage,如 [1] 的介绍,不过 Presto 也利用 Shuffle 向 Spark 这样风格的查询做了一些过度 [2]。本文利用了 Spark 之类的 Stage 的特点,通过历史的查询和之前 Stage 的信息,来动态的规划后续查询的执行:毕竟你过 Stage 都得 Shuffle,那就好好利用这一步骤,多收集统计信息,来调整之后的查询了。另一个特点是优化的地方都有介绍,包括动态分配 DOP、调整 Join 算法、处理 Skew 等。这些说起来都不难,不过实际上要做到还是要花一些时间的。

本文其实难度不大,不是那种很细节的文章,所以读起来比较快。笔者比较短的时间就读完了。

Why AQE

  • Firstly, in large-scale, open Lakehouses with uncurated data, high ingestion rates, external tables, or deeply nested schemas, it is often costly or wasteful to maintain perfect and up-to-date table and column statistics.
  • Secondly, inherently imperfect cardinality estimates with conjunctive predicates, joins and user-defined functions can lead to bad query plans.
  • Thirdly, for the sheer magnitude of data involved, strictly relying on static query plan decisions can result in performance and stability issues such as excessive data movement, substantial disk spillage, or high memory pressure.

本质上是在说:

在信息收集端

  1. 本身新来的数据没有好好的 analyze,对新插入的数据处理不及时,导致统计不及时。因为统计的不及时,所以收集的统计信息也不够及时
  2. 可能是外表之类的表,这种场景也比较常见,拿不到外部的 Catalog(或者拿到的也不是特别准)
  3. 对 (deep) nested schema 之类的东西,内部深嵌套的内容很难估计对应的统计信息
  4. 对于 UDF, Join 之类的东西(甚至一些稍稍复杂的 Filter)都不是很好估计大概对应的 output ratio
  5. Table 的 size 大小也不一样,实际上因为各种大小不一样,执行层各种东西(参数)也不一样,所以需要调整对应的信息,e.g. 估大了跑的慢但一定能跑得出来,内存估小了给你疯狂 Spill)。

因为信息收集端的上述内容,同时也有执行的时候不同的 workload,;导致执行段遇到了问题:

  1. Broadcast Join vs Shuffled Join? 怎么选择哪个表大哪个表小,如何处理?(怎么没比较 Sort Merge Join)
    1. Broadcast 只是广播小表,Shuffled Join 会需要两个 Shuffle 然后 co-locate 处理
  2. Degrees of parallelism: 怎么决定下游 stage (或者甚至前面 scan stage 的并发度)
  3. 根据 data volume size 决定另一些东西,比如
    1. 是否需要 Join order
    2. 根据 filter creation cost,决定是否要 dynamic partition pruning/dynamic file pruning/bloom filter..
  4. 找到一些动态信息,比如 partitioning properties, and interesting orders ( 话说回来,感觉有的地方执行的时候也会处理这些)
  5. Graceful degradation strategies: 处理 skew 之类的场景,这些 qo 可能还不方便处理

这里给出对应的 sample query,schema 参考 TPC-H 的 schema [3]

1
2
3
4
5
6
7
8
9
10
11
SELECT    c.c_name
o.o_orderdata,
SUM(o.o_totalprice) AS revenue
FROM customer AS c, orders AS o
WHERE c.c_mktsegment = 'BUILDING'
AND c.c_acctbal > 8000.0
AND c.c_custkey = o.o_custkey -- join key
AND o.o_orderdate BETWEEN date('2024-03-15') AND date('2024-04-15')
GROUP BY c.c_name, o.o_orderdate
ORDER BY revenue DESC
LIMIT 10;

给定问题:

  1. 在过完 where 之后,怎么估计两边的行数和 size-in-bytes
  2. 是否应该用 dynamic filter
  3. 怎么利用 dynamic data properties 来给后续的查询提供优化
  4. 使用什么 join algorithm
  5. 怎么定义查询各部分的并发度(DOP)
  6. 怎么处理非预期的 Skew, Memory Pressure?

Alternative to AQE

  1. Cardinality estimation: 对于 UDF, Join, Conjunctive predicates 可能不准确;对 un-index data 可能准确度只有 10%。Catalyst optimizer 通常只给出 worst-case cardinality
  2. Physical plan cost models: 利用更细的 CPU 执行信息。这里本身还是说,physical 估计的信息更准,不过 TUM 的论文 [4] 又觉得不太是,反正我不太懂优化器,你们打一架吧,我就负责贴信息。但他们认为这玩意也只是提高了准确性而已
  3. Sampling: There has been a flurry of research literature [1, 17, 18] attempting to leverage sampling, including random samples, online samples, block samples, materialized samples, and stratified samples to make cardinality estimation better. In practice, they could be effective for specific scenarios, e.g., random samples for uniformly distributed data, and stratified samples [1] for predicates over strata columns. Nevertheless, there is an inherent trade-off between the expense of sample collection and its effectiveness.
  4. History-based cardinality estimation: 本身 Spark 运行的很多也是大 ETL Job,这种 History 信息本身也是有用的。但是拿到旧的版本信息对 catalog 之类的也提供了更多的要求
  5. Machine Learning: 需要更多的可解释性、调试性

文章觉得 AQE 本身也就是一个框架,和这些内容并不冲突:

In our Photon engine, the Shuffle implementation has such a breaker, originally for the simplicity of task scheduling and fault-tolerance. Thus, AQE is a natural fit. At a high level, AQE and advancements in its alternatives are largely complementary in their evolutions. Better static query plans may eventually relax constraints on the query execution substrate, while AQE provides the ultimate safeguard to experiment with its alternatives.

AQE Architecture

img

下面给出 sample query 的 plan 和 AQE 框架中的一些概念:

  1. QueryStage: 一种特殊的「动态」Operator,包含了提交的 Plan Fragments,这保证了「plan 本身的稳定性」,因为修改都是改 QeuryStage 内的东西。在目前视线中,Shuffle Boundary 会切 Query Stage. 这里可以参考 [5] 中的查询。如下图,所有 operator 都包在
  2. LocalLink: Physical Plan 只想 LogicalPlan 的链接,便于 Re-Optimize 的时候使用
  3. Runtime Statistics: Each QueryStage can either estimate statistics from running tasks’ metrics or collect statistics from completed tasks’ metrics. 同时通过 LogicalLinks 来汇报给 LogicalPlan

img

论文 Listing 2也给出了 AQE 的核心的 Event Loop,注意下 16行用了 PhysicalPlan replan. Event 通常来自:

  1. (dependent) QueryStage completion -> actual statistics observed from completed QueryStages
  2. QueryStage failure(or timeout) -> actual statistics observed from completed QueryStages
  3. Heuristics with task(on-going) metrics -> estimated statistics from running QueryStages’ metrics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Kick off initial QueryStages.
LogicalPlan currentPlan = initialPhysicalPlan.logicalLink;
List<QueryStage> initialRunnableStages = breakDown(initialPhysicalPlan);
initialRunnableStages.foreach(stage => Scheduler.submit(stage));
runningStages.addAll(initialRunnableStages);
do {
// Blocking wait until new re-optimization event
// being added into `reOptEventQueue` by producer
// threads.
Event reOptEvent = reOptEventQueue.take();
// Update `runningStages` and `currentPlan`.
currentPlan = update(reOptEvent, runningStages, currentPlan);
// Call logical rewrite rules to optimize `currentPlan`.
LogicalPlan reOptPlan = reOptimize(currentPlan);
// Convert `reOptPlan` to a physical plan.
PhyiscalPlan currentPhysicalPlan = plan(reOptPlan);
// Break down `currentPhysicalPlan` into runnable
// QueryStages.
List<QueryStage> runnableStages = breakDown(currentPhysicalPlan);
// Cancel running QueryStages that are no longer needed.
runningStages.diff(runnableStages).foreach(stage => Scheduler.cancel(stage))
// Submit new runnable QueryStages to the scheduler.
List<QueryStage> runnableNewStages = runnableStages.diff(runningStages)
newStagesToRun.foreach(stage => Scheduler.submit(stage));
runningStages.addAll(newStagesToRun);
} while (hasUncompletedStages());

QueryStage Cancellation

这里我觉得是比较有意思的地方,避免 rerun task,利用已经 scan 的部分。不过没做过这块的细节,可能能避免一些旧的 Stage 被再次调度:虽然他们很慢,但终究跑完了。(不太清楚正在 running 的去 cancel 有什么考量,和 Spark 是怎么处理 Splits / Stage 的,看后面在统计的时候也会知道别的 job 的完成度,见 Figure 3)。

Line21 中, Scheduler.cancel(stage) 会尝试停止 Large Scan / Shuffles / Disk Spills。这里逻辑大概是:

  1. stop ongoing large scans, shuffles, or disk spills
  2. For idempotence, a completed QueryStage would not be rerun because it becomes a leaf node in the new logical and physical plans from lines 13 to 16

这里要求:

  1. cancel 需要做到幂等性:完成的 task 不会被 rerun,task 只需要 cancel 一次
  2. 已经完成的QueryStage会成为新逻辑和物理计划中的叶子节点,这意味着它们的中间结果可以被直接利用,而无需重新计算。

(重新思考一下,只考虑 Scan 的话,其实等价于在 Join 之类的地方(也不只是 Join)新增或者变化 Filter 规则,然后需要处理预读的数据,最浪费的方法肯定是打开重跑,好的方法是利用之前数据,当然 Spark 考虑的还有Stage产出数据的一致性什么的. 不知道为啥没有 split 或者什么级别的完成结果利用)

Performance Optimizations

这里元素包含:

  1. Logical Rewrite: inject Semi-Join reduction filter variants such like dynamic partition/file pruning filters (DPPs, DFPs) [23] and Bloom filters [14] (Section 5.1), and optimize away plan fragments that are no longer needed (Section 5.2) -> 感觉像是根据完成的 Join Stage 插入裁剪
  2. Planner Rule: that revisits and changes the static planning decision on which join algorithm to use for a logical Join operator (Section 5.3);
  3. Physical Rewrite: dynamically adjusts the Shuffle parallelism (Section 5.4).

Logical Rewrite: Dynamic Join Filters

如下图,执行层没啥意思,plan 层代价评估要重要一些吧。

img

Logical Rewrite: Dynamic Data Properties

img

  • 如果某个地方是 Empty 的 / 只产出一行的,可以根据这个结果来消除或者优化 plan,比如空表 Join 或者别的做处理。然后如果是只有一行,可以干掉 Sort 之类的
  • 别的还是 DPP 之类的

感觉也没啥好说的,但是在工程上还是有意义的。不过这么无聊也写这么长啊哥

Planner Rule: Join Algorithm Re-Selection

Photon 有两种 HashJoin:

  • Broadcast Hash Join
    • One side of a Join is small enough to fit into the memory of an individual executor, the smaller side (known as the build side) is broadcast to all participating executor nodes, eliminating the need for repartitioning of the other side (the probe side).
    • It is important to note that different joiner threads on the same executor node share the same build side hash table and data, residing in memory. <— 不是你估计的这么准保证不 spill 吗
  • Shuffled Hash Join.
    • Both sides undergo shuffling before being joined. On an individual executor, the local join algorithm is a vectorized implementation of Hybrid Hash Join [11, 39], which can gracefully spill to disk if necessary.

下面的 plan 中:

  1. Plan 被初始化为 Shuffled Hash Join
  2. 左侧数据 size 只有 50M, Rows 也很小,可以被切成 Broadcast Hash Join

img

当然,相反的,这里也可能是 Broadcast Hash Join 因为判断出来内存压力过大,改成 Shuffled Hash Join。

Physical Rewrite: Elastic Shuffle Parallelism

起因是定并发度本身就比较难(某种意义上参考 F1 Napa 的并发 Paper),很多系统也是限定并发。这部分过多过少都不好:

  • Under-parallelism. In this scenario, each Shuffle consumer task handles a large volume of data, which can result in unnecessary C**PU cache misses or disk spillages** (e.g., for operators like Join, Aggregation, and Sort), consequently slowing down queries. ( 说实话我没想清楚,为什么上层的并发会影响下层的 CPU Cache miss,是不是因为是那种 HashTable 有关的 Random Access 负载)
  • Over-parallelism. Conversely, in this case, there may be numerous small network data fetches, leading to inefficient network I/O patterns. On top of that, over parallelism also causes excessive scheduling overhead, which can be another significant contributor to performance slowdowns.

img

对应的实现是:

  1. 在 ShuffleWrite 记录准确的分区写,每个分区的大小
  2. AQE 承担调度器的作用,合并过小分区,减少过大的。然后修改 ShuffleRead 的 Partition Specification。 这部分只需要改对应的描述(Shuffle partitions are physically contiguous in partition numbers, allowing the “merge” operation to be logical without additional reads or writes of the Shuffle data.)

Rubustness

让已经在跑的 SQL 跑的更稳,让 spill 之类的多的能减少对应的状态。

Logical Rewrites: Broadcast Hash Join Fallback

下图为 Listing 3。

由于下列两种原因,可能最初 plan 会成为 broadcast hash join:

  1. The query attempts to enforce a Broadcast Hash Join implementation through a SQL hint. (尤其是 BI 生成的 SQL)
  2. The logical Join is a Null-aware Anti Join. This can be implemented using a Broadcast Hash Join but not by a Shuffled Hash Join, because the latter does not always produce correct results as per standard SQL semantics. In addition, the build side and probe side cannot be switched.

对上面两种:

  1. AQE 能检测到 Broadcast Join 过大,可能生成 re-optimization
  2. 对 case1,可以 drops the join hint,尝试生成 Shuffled Hash Join
  3. 对 case2,可以尝试修改成 Listing3,成为另一种等价的稍复杂点的 Shuffled Hash Join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- Input query:
SELECT *
FROM customer
AS c WHERE c.c _ custkey NOT IN (SELECT o _ custkey FROM orders)

-- Rewritten query when orders is not empty:
-- If orders has a NULL o_custkey:
-- no customer row qualifies the NOT IN predicate;
-- otherwise:
-- a normal LEFT ANTI JOIN can work, except that
-- customer rows with c_custkey being NULL do not
-- qualify the NOT IN predicate.
SELECT * FROM (
SELECT * FROM customer AS c
WHERE NOT EXISTS (
SELECT * FROM orders WHERE o_custkey IS NULL)
AND c.c _ custkey IS NOT NULL
) AS c
LEFT ANTI JOIN orders AS o ON c.c_custkey = o.o_custkey;

Planner Rule: Shuffle Elimination Fallback

这里可能会有 Partition 之类的策略,来消除 Shuffle,因为原则上 Shuffle 还是会让 Query 更慢一些(也能够更稳一些)。但是如果 Partition Column 估计小了,那么处理的并发度就不够,同时也会有更大的概率 Spill (因为 skew 和数据量变大)

下面是代码的 Listing 4,plan 如图(Figure 7)

img

During execution, it turns out that there are only 2 distinct values of R.a, and thus the Hash Aggregation after Join only has two effective, parallel tasks across all executors, regardless of the number of Shuffle partitions. This can cause over spillage when the number of groups by is excessively large, especially if the join predicate leads to a many-to-many join.

按照 Fig7 插入 Shuffle 尝试拆散数据。本质上是 Fig7.a 的 Join DOP 用的是 R.a 的 DOP,不适配 (R.a, R.h, S.c) 的。需要改大并发度。

1
2
3
4
5
SELECT R.a, R.h, S.c, SUM(R.d * S.e) AS v
FROM R, S
WHERE R.a = S.a AND R.b=S.b AND p(R.g)
GROUP BY R.a, R.h, S.c
ORDER BY v DESC LIMIT 10

Physical Rewrite: Skew Join Handling

We have also implemented a physical rule to handle skewed join keys. The rule is able to discover data skew on a set of join keys in a Shuffled Hashed Join, which manifests as a few partitions containing significantly more data than others.

The rule can eliminate the skewness by logically splitting those large, consumer-side partitions into smaller, more balanced partitions,

img

Evaluation

We evaluate AQE performance improvements on a 16-node AWS cluster with one driver node. Each node is an i3.2xlarge instance with 64GB of memory and 8 vCPUs (Intel Xeon E5 2686 v4). We ran benchmarks of TPC-H and TPC-DS, stored with the Delta format in Amazon S3, on different scale factors (1000 and 3000), with and without pre-collected table and column statistics via the Analyze Table command.

  1. Re-optimized overhead 占用比例较低
  2. 数据越多,优化越高
  3. 对没有 Stats 的查询,优化效果显著
  4. (可能对比较简单的查询不太好?然后不知道会不会有优化事件过多的情况)

img

Reference

  1. https://blog.bcmeng.com/post/mpp-grouped-excution-stage.html#what-is-in-mpp-pipeline
  2. https://research.facebook.com/publications/presto-a-decade-of-sql-analytics-at-meta/
  3. https://docs.snowflake.com/en/user-guide/sample-data-tpch
  4. https://www.vldb.org/pvldb/vol9/p204-leis.pdf
  5. https://stackoverflow.com/questions/74811159/what-is-shufflequerystage-in-spark-dag