Expression Execution in Velox Part 2: Expr::eval

上一节我们介绍了 SimpleFunction 执行的链路,表达式除了 Simple Function 之外:

  1. 还会能让用户直接继承 VectorFunction 写一些自己定义的 Agg 或者别的方式,这里我理解和 Arrow Compute 里面的定义是相似的
  2. Register 一些 Agg 之类的函数,然后进行各种操作

Velox 官方很多地方还是靠 UDF 的,很诚恳( 指没有在内部搞各种骚操作 )。他们的展开在上一节(https://blog.mwish.me/2024/01/24/Expression-Execution-in-Velox-Part-1-Basics-and-UDF/) 中有所介绍.

本期介绍的是 Velox Expression 的执行,执行具体的 Expr

执行之前的部分

这里要区分 Velox 的 Expr,下图是个很好的例子:

img

Velox 在执行前拿到的很大程度上是左侧的表达式,这里可以见:https://blog.mwish.me/2022/11/13/Introduction-to-velox/#Compilation . 简单说这里在执行之前会做一些优化包括:

  1. CSE 的处理
  2. 常量折叠
  3. Flatten ANDs and ORs …

最终这里会生成一个表达式. Compile 签名如下:

1
2
3
4
5
std::vector<std::shared_ptr<Expr>> compileExpressions(
const std::vector<core::TypedExprPtr>& sources,
core::ExecCtx* execCtx,
ExprSet* exprSet,
bool enableConstantFolding = true);

我们这一节不会直接介绍这里的逻辑。这里的生成结果是 ExprSet,它是一组表达式的集合,共享了一些表达式的上下文。在 ExprSet 中有些表达式是会被共享的。

具体而言,这里最后产生的是一个 facebook::velox::exec::ExprExpr 本身便于执行一些带 VectorFunction (很多又是我们上一节介绍的 Simple Function),而剩下的被处理为 SpecialForm,表示 Velox 自己包装的 if 之类的逻辑。很多错误处理的逻辑也包装在里面。

1
2
/// Constant, Cast, Coalesce, Conjunct(And, Or), FieldReference, Switch, Lambda, Try.
class SpecialForm : public Expr;

表达式的执行链路

相关的背景

输入: ExecCtx

在上一节介绍过:https://blog.mwish.me/2024/01/24/Expression-Execution-in-Velox-Part-1-Basics-and-UDF/#EvalCtx 。这里的输入是整个表达式的行。

考虑一个情况,比如输入来自 TableScan 之类的,那么这里就很可能是 Lazy 的或者是编码的输入。

输入: Constant / FieldReference

我们不打算太前面介绍一些特殊表达式,但是连 Constant 和 FieldRef 都不知道的话输入就没法介绍下去了。

关于 Constant 和 FieldReference 感觉是大家都有的表达式类型,但是 Constant 比较好玩的一点是,可能是因为复用代码或者内存不好管理,感觉 Velox 中 Scalar 是用 ConstantVector 存储的…

这两个结构如下 ( Vector 一些粗略介绍见 https://blog.mwish.me/2024/01/24/Expression-Execution-in-Velox-Part-1-Basics-and-UDF/

1
2
3
4
5
6
class ConstantExpr : public SpecialForm {
private:
// 内部生成的 Constant column
// 因为这个 constant value 构造来自于 VectorPtr value, 所以会在这上面包一层 Constant.
VectorPtr sharedConstantValue_;
};

和:

1
2
3
4
5
6
7
8
9
10
11
// 以名称的形式来 Bind 在:
// 1. EvalCtx 输入行的一个字段上(inputs_.empty() 的时候, 会在执行的时候从 inputRow 拿到输入).
// 2. Inputs 仅包含一个表达式时, 在该表达式的输出上( `inputs_[0].eval` ).
//
// 值得一提的是, 如果输出来自表达式之类的, 这里的输出会尝试拍平, 即会尝试弄成
// Flatten 的结构, 解掉一些 Peeling
class FieldReference : public SpecialForm {
private:
const std::string field_;
int32_t index_ = -1;
};

这两者都会尝试计算,然后每次把值输出。这里会尽量尝试直接 reuse Input,然后 reuse 失败才会 EvalCtx::moveOrCopyResult 去输出结果。这里还有一些细节但感觉现在介绍还是过早了,现在把他们当成表达式输入就行,最后在 SpecialForm 介绍。

基础的优化手段和 Compute Metadata

这里可以参考下面几个链接,讲过的东西再讲一遍就不太好了。

  1. 官方博客:https://facebookincubator.github.io/velox/develop/expression-evaluation.html
  2. Velox 发表的论文:https://engineering.fb.com/2023/03/09/open-source/velox-open-source-execution-engine/
  3. 和我之前对论文的 comment: https://blog.mwish.me/2022/11/13/Introduction-to-velox/

这里需要知道的是:

  1. CSE 识别之类的东西是在 Compiler 的地方发现的,但是会在执行的时候 aware 一些 CSE。具体来说,它会计算某个 child 是否被多次 ref,如果被多次 Ref 的话,可能会尝试缓存计算的结果,并且每次做增量计算(具体而言,对于一个 EvalCtx 和一组输入,一个共享表达式,多次计算的 Selection 可能有重合,也有不重合的地方。每缓存算过的地方,新算没算过的地方即可,这里也是当作计算本身的 cost 比较高?)
  2. Constant Folding 之类的也是 ExprSet 之类的地方发现处理的,会在 Expr::eval 之前,具体代码也在下面:

编译的时候 ( compileRewrittenExpression ) 这里会调用 Expr::computeMetadata. 之前的论文介绍,关于 Metadata 的部分我们跳过了很多,但是这节竟然要深入细节,就要用罗列 API 的气势讲讲这些。这个函数的开头会计算所有 children 的表达式,如果已经计算完成的话,不会重复计算:

1
2
3
4
5
6
7
8
9
10
11
void Expr::computeMetadata() {
if (metaDataComputed_) {
return;
}

// Compute metadata for all the inputs.
for (auto& input : inputs_) {
input->computeMetadata();
}
// ...
}

当然,Constant Folding 的逻辑也在 compileRewrittenExpression

1
2
3
4
5
6
7
8
result->computeMetadata();

// If the expression is constant folding it is redundant.
auto folded = enableConstantFolding && !isConstantExpr
? tryFoldIfConstant(result, scope)
: result;
scope->visited[expr.get()] = folded;
return folded;
distinctFields, multiplyReferencedFields, sameAsParentDistinctFields

img

上图是一张非常好的图,介绍了 distinctField 的计算逻辑(但是没有写 MultiReference)。简单的说,这里输入可以调整为各个 FieldReference,然后 Expr 会维护两个变量:distinctFields, multiReferencedFields,具体而言,这里认为所有的对 ExecCtx 中 Row Input 输入的 Ref 都是 FieldReference, 这个时候它会有如下规则:

对于一个表达式,它会 Merge 所有子表达式的 Input 的 FieldReference。FieldReference 表达式本身只会有自己的 distinctField,那么下游的 multiReferencedFields 会直接走到 mutliReferencedFields 中。

  1. 第一次遇到,并且没有在 mutliReferencedFields,会放到 distinctField
  2. 多次遇到会放到 multiReferncedFields

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void Expr::mergeFields(
std::vector<FieldReference*>& distinctFields,
std::unordered_set<FieldReference*>& multiplyReferencedFields,
const std::vector<FieldReference*>& moreFields) {
for (auto* newField : moreFields) {
if (isMember(distinctFields, *newField)) {
multiplyReferencedFields.insert(newField);
} else {
distinctFields.emplace_back(newField);
}
}
}

void Expr::computeDistinctFields() {
// 在调用这个之前这块( distinctFields_, multiplyReferencedFields_ )应该是 empty 的.
// 合并所有子表达式的 distinctFields_, 把出现多次的放到 multiplyReferencedFields_ 里面.
for (auto& input : inputs_) {
mergeFields(
distinctFields_, multiplyReferencedFields_, input->distinctFields_);
}
}
propagatesNulls

这里我们在 VectorFunction 那讲过类似的,即是否 null-in-null-out。

这里 Velox 的表达式系统拆分了两套接口(见: https://github.com/facebookincubator/velox/pull/5287 ):

1
2
bool propagateNulls() const;
void computePropagatesNulls();
  1. 对于 VectorFunction 的表达式, 从 VectorFuction 上直接拿到 propagateNull 就可以了. Expr 上也有 bool propagateNulls() const 的接口.
  2. 对于 SpecialForm, 额外从 !vectorFunction 来计算 propagateNulls.

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// (3) Compute propagatesNulls_.
// propagatesNulls_ is true iff a null in any of the columns this
// depends on makes the Expr null.
//
// Constant, FieldReference, CastExpr, SpecialForm 这些当成 Vector 来处理.
// Q: 为什么 SpecialCase 要这么处理呢?
// A: Vector 直接用里头的就可以了. Constant, FieldReference, CastExpr 这些
// 是零条/多条输入流 - 抛出 null 的, 和正常函数一起处理, if/switch/try 这种就
// 自己 compute 了
if (isSpecialForm() && !is<ConstantExpr>() && !is<FieldReference>() &&
!is<CastExpr>()) {
as<SpecialForm>()->computePropagatesNulls();
} else {
if (vectorFunction_ && !vectorFunction_->isDefaultNullBehavior()) {
propagatesNulls_ = false;
} else {
// vectorFunction 已经是 defaultNull. 或者是 Const, fieldRef, Cast 这样的.
// 现在要检查 inputs_ 的 NullPropagating.
// 规则:
// * 整个表达式的输入来源于所有子表达式的输入, 根输入是 `distinctFields_`.
// * input 要么是 PropagateNull, 要么不是
// * 那么, 如果自身是 PropagateNull, 然后如果 `!PropagateNull` 的子表达式中,
// 全部是 `distinctFields_` 的子集, 这里输入中如果有任何一个 Null, 那么
// 返回就是 Null.

// Logic for handling default-null vector functions.
// cast, constant and fieldReference expressions act as vector functions
// with default null behavior.

// If the function has default null behavior, the Expr propagates nulls if
// the set of fields null-propagating arguments depend on is a superset of
// the fields non null-propagating arguments depend on.
std::unordered_set<FieldReference*> nullPropagating, nonNullPropagating;
for (auto& input : inputs_) {
if (input->propagatesNulls_) {
nullPropagating.insert(
input->distinctFields_.begin(), input->distinctFields_.end());
} else {
nonNullPropagating.insert(
input->distinctFields_.begin(), input->distinctFields_.end());
}
}

// propagatesNulls_ is true if nonNullPropagating is subset of
// nullPropagating.
propagatesNulls_ = true;
for (auto* field : nonNullPropagating) {
if (!nullPropagating.count(field)) {
propagatesNulls_ = false;
break;
}
}
}
}

这里可以看到一个逻辑,即 vectorFunction 已经不是 nullPropagete 的情况下肯定不是 nullPropagetevectorFunction 是 nullPropagete 的情况下,需要检查所有的输入。这里有个性质比较好玩:

  • 抽取所有 nullProgate 的 Input Fields 和所有 !nullPropagate 的 Input Fields
  • 如果 !nullPropagate 是 nullPropagate 的子集,那么任何一个 child 的 !nullPropagate 列,都会产生一个 null input,然后目标是 null,所以是 nullPropagate 的
  • 反之,不是 nullPropagate 的
Determinstic

是否是确定性的表达式。这里逻辑很简单,我就不讲了…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// (1) Compute deterministic_.
// An expression is deterministic if it is a deterministic function call or a
// special form, and all its inputs are also deterministic.
//
// 注意到这个 desterminstic 好像不是递归的,只是单纯的看自己的 deterministic_.
// 所以 SpecialForm 里面的 inputs_ 本身可以是 deterministic 的, 然后后面再去
// 跟子表达式去处理 & desterminstic.
if (vectorFunction_) {
// 利用 vectorFunction_ 的 isDeterministic() 方法
deterministic_ = vectorFunction_->isDeterministic();
} else {
VELOX_CHECK(isSpecialForm());
deterministic_ = true;
}

for (auto& input : inputs_) {
deterministic_ &= input->deterministic_;
}
hasConditions

这个任何一个子表达式包含 condition 就会判断,逻辑很弱智:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Returns true if input expression or any sub-expression is an IF, AND or OR.
bool hasConditionals(Expr* expr) {
if (expr->isConditional()) {
return true;
}

for (const auto& child : expr->inputs()) {
if (hasConditionals(child.get())) {
return true;
}
}

return false;
}

Expr for VectorFunction

我们以下列简单的表达式来表示这里的逻辑:

  1. 1
  2. 1 + a
  3. (1 + a) * 2 + b

我们也补上官方的一张图,作为一个比较好的参考

img

(这张图少了 evalWithMemo: https://facebookincubator.github.io/velox/develop/expression-evaluation.html#memoizing-the-dictionaries )

我们可以在下图画出一个实际的路线图:

expr-eval-code

Flat no null fast path

这一段可以见下面的环节,官方文档说的很好。主要是在 ML 之类的场景,处理 encoding 和 Null 本身有一定开销。官方博客这段放在比较后头,但是比较早介绍这个能比较快的 introduce 一个不处理 Null、不处理 encoding 的简单模型,我们可以在这个模型上再扩展出后续的能力,所以我觉得这样讲也不坏。

https://facebookincubator.github.io/velox/develop/expression-evaluation.html#flat-no-nulls-fast-path

这段逻辑的入口在下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void Expr::eval(
const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result,
const ExprSet* parentExprSet) {
if (supportsFlatNoNullsFastPath_ && context.throwOnError() &&
context.inputFlatNoNulls() && rows.countSelected() < 1'000) {
// 满足了 ML 那种 FastPath 的条件, 执行 FlatNoNull 函数
// 主要是在 ML 之类的场景,处理 encoding 和 Null 本身有一定开销, 框架就不处理了.
// 具体见这个 patch: https://github.com/facebookincubator/velox/pull/1943
// 把细节说的比较清楚.
evalFlatNoNulls(rows, context, result, parentExprSet);
checkResultInternalState(result);
return;
}
/// ...
}

void Expr::evalFlatNoNulls(
const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result,
const ExprSet* parentExprSet) {
if (shouldEvaluateSharedSubexp()) {
evaluateSharedSubexpr(
rows,
context,
result,
[&](const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result) {
evalFlatNoNullsImpl(rows, context, result, parentExprSet);
});
} else {
evalFlatNoNullsImpl(rows, context, result, parentExprSet);
}
}

我们观测到有一段 shouldEvaluateSharedSubexpevaluateSharedSubexpr ,这些是 MultiRef 的 CSE 表达式求值的逻辑。我们下一节介绍。

我们直接来贴 evalFlatNoNullsImpl 的逻辑,关注几个点:

1
2
3
4
5
6
ExprExceptionContext exprExceptionContext{this, context.row(), parentExprSet};
// 内部用 ExceptionContextSetter 来包装 `ExprExceptionContext`,
// 添加异常的时候打印这个表达式的逻辑.
ExceptionContextSetter exceptionContext(
{parentExprSet ? onTopLevelException : onException,
parentExprSet ? (void*)&exprExceptionContext : this});

这里是异常处理的上下文。Velox 使用了一个 ThreadLocalExceptionContext, 在抛出异常的时候,这里会加入 ExceptionContext 里面的字符串。这里内容是加上了这个表达式的 ToString,来在抛异常的时候,能够定位到表达式的上下文。

我们再看下面具体的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void Expr::evalFlatNoNullsImpl(
const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result,
const ExprSet* parentExprSet) {
ExprExceptionContext exprExceptionContext{this, context.row(), parentExprSet};
// 内部用 ExceptionContextSetter 来包装 `ExprExceptionContext`,
// 添加异常的时候打印这个表达式的逻辑.
ExceptionContextSetter exceptionContext(
{parentExprSet ? onTopLevelException : onException,
parentExprSet ? (void*)&exprExceptionContext : this});

if (!rows.hasSelections()) {
checkOrSetEmptyResult(type(), context.pool(), result);
return;
}

if (isSpecialForm()) {
// If, And, Or 之类的洞都是 special case.
evalSpecialFormWithStats(rows, context, result);
return;
}

// Prepare Input
// 这个地方 constantInput 还不是 inputValues_ 😅
// 想了一下, 应该本质是因为 eval 的时候需要 resize, 所以特判一下.
inputValues_.resize(inputs_.size());
for (int32_t i = 0; i < inputs_.size(); ++i) {
if (constantInputs_[i]) {
// No need to re-evaluate constant expression. Simply move constant values
// from constantInputs_.
inputValues_[i] = std::move(constantInputs_[i]);
// 这里是 constant 的时候, 需要 resize 到 `rows.end()`.
inputValues_[i]->resize(rows.end());
} else {
// 这个地方是说 inputValues_ 里面的值不是 constant 的.
// 这个时候需要通过 eval 来拿到值
inputs_[i]->evalFlatNoNulls(rows, context, inputValues_[i]);
}
}

// Apply VectorFunction
// 执行 Vector Function.
applyFunction(rows, context, result);

// Move constant values back to constantInputs_.
// 恢复 constantInputs_ 的值, 等待下一次继续 resize.
for (int32_t i = 0; i < inputs_.size(); ++i) {
if (inputIsConstant_[i]) {
constantInputs_[i] = std::move(inputValues_[i]);
VELOX_CHECK_NULL(inputValues_[i]);
}
}

// 处理掉非 Const 的 Input Value, 这些来自表达式的生成.
//
// Q: reuse input 会怎么处理这些?
// A: reuse input 要求输入和输出列类型相同, 然后是 unique 的.
// 结果它会 reuse 相同的内存.
// 重点是 `releaseInputValues` 下层 `VectorPool::release`
// 的时候, 如果 !unique, 就不会把这个内存释放掉.
releaseInputValues(context);
}

applyFunction 的逻辑很简单直接,就是调用我们上一节的 Expr。这里注意一下参数的 ConstantInput 处理,因为这里需要把 Constant 的 Size 调整到和 Row 的范围内。

最后有个 releaseInputValues, 这里需要注意的是,这里按照引用计数来释放。

这里介绍有点抽象,我们举例子:(1 + a) * 2, 这里的表达式是:

  1. Expr(input = ConstantExpr(1), input = FieldReference("a"), vectorFunction="add")
  2. Expr(input = ConstantExpr(2), input = Expr(1), vectorFunction="mul")

在 Expr2 求值的时候,逻辑如下:

  1. 假如输入有 500 行,对 Constant,resize 到 500
  2. 对 Expr1 递归的执行 evalFlatNoNulls, 拿到一个 Vector(是 FlatVector) 作为输出
  3. 执行的时候,发现 input reuse, 输出和输入是一个类型,选中 Expr(1) 结果存放表达式 _ * 2 的结果
  4. 还回去 constant
  5. releaseInputValues,这里因为有 reuse input,所以 Expr(1) 结果只是减少了 ref-count,没有背释放掉,这个结果被返回。

CSE 计算

shouldEvaluateSharedSubexpevaluateSharedSubexpr 这些和 CSE 的计算有关。后面字典求值一些逻辑也和这块很相似。我个人认为 CSE 是出于一种设计:表达式计算相对比较重,输入容易重复。

cse-velox

上面是一个 CSE 的典型场景。对于同一个 Row Input, 两个表达式计算的 Selector 可能相同也可能有不重合的地方.

这里逻辑大概如下:

  1. (显然的)必须要 deterministic 才能执行 CSE 缓存
  2. 必须是 MultiReferenced 的(也很显然…)
  3. inputs_.empty(),这里逻辑是 inputs_ 只包含子表达式,没有子表达式又是 deterministic,基本上就是 FieldRef Const 这类了,CSE 也没啥意义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/// Returns true if this is a deterministic shared sub-expressions with at
/// least one input (i.e. not a constant or field access expression).
/// Evaluation of such expression is optimized by memoizing and reusing
/// the results of prior evaluations. That logic is implemented in
/// 'evaluateSharedSubexpr'.
///
/// Shared Sub Expr 在 comment 里面定义又叫 CSE.
bool shouldEvaluateSharedSubexp() const {
// 需要是:
// 1. deterministic (不 determinstic 感觉不能增量算, 要每次自己算了)
// 2. 被多个表达式依赖
// 3. 有 input --> i.e. not a constant or field access (并非指向表达式结果的 FieldRef)
// expression. 这些表达式都很轻, 感觉也没必要 CSE 这种搞法了.
//
// 这个还是看算子树的情况
return deterministic_ && isMultiplyReferenced_ && !inputs_.empty();
}

Expr 中,有 CSE 对应的成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct SharedResults {
// The rows for which 'sharedSubexprValues_' has a value.
//
// CSE 之已经计算过的 rows.
std::unique_ptr<SelectivityVector> sharedSubexprRows_ = nullptr;
// If multiply referenced or literal, these are the values.
//
// CSE 的结果 Vector.
VectorPtr sharedSubexprValues_ = nullptr;
};

// Maps the inputs referenced by distinctFields_ captuered when
// evaluateSharedSubexpr() is called to the cached shared results.
//
// CSE 计算的结果, 受到 `maxSharedSubexprResultsCached()` 的限制.
std::map<std::vector<const BaseVector*>, SharedResults> sharedSubexprResults_;

在计算的时候,逻辑如下:

  1. 如果是 CSE,找到上一次的 CSE 缓存(即上面的 sharedSubexprResults_)
  2. 如果没有找到,就地对 selection 进行计算,并且尝试加入 CSE 缓存
  3. 否则,比较这次的 selection 和计算缓存的 sharedSubexprRows_,然后加入结果集。

EvalAll: 相对 flat no null 多了什么

Expr::evalAll 定位有点类似 evalFlatNoNulls, 它是这里最泛用的一套东西了。

我们简单看 Expr::eval,它也有设置 Exception 上下文的逻辑,我们不再赘述。我们忽略掉一些 Lazy Eval 的逻辑,直接看下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Expr::eval(...) {
/// ...
if (inputs_.empty()) {
// 没有 input 直接快速 evalAll, 这个地方应该是常量表达式或者 FieldRef 之类的
// (或者甚至 non-determinstic 的生成表达式), 反正也不用管你 encoding 什么
// 的, 直接 evalAll 执行最内层逻辑就是.
evalAll(rows, context, result);
checkResultInternalState(result);
return;
}

evalEncodings(rows, context, result);
checkResultInternalState(result);
}

这里逻辑应该非常好理解,没 input 的话,那就 rows 本身有关的叼毛 Null 没有、Encoding 也没有了(当然,子表达式还是会产生这些逻辑的,我们延迟到 Expr::evalAll 处理。它为什么不先求值呢?我猜也是和 Expr::eval 的中间各种对 NULL 之类的优化有关)。

我们直接贴逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
void Expr::evalAllImpl(
const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result) {
VELOX_DCHECK(rows.hasSelections());

if (isSpecialForm()) {
// 开洞执行, 我也不知道咋搞的.
evalSpecialFormWithStats(rows, context, result);
return;
}
// tryPeelArgs 指的是内部执行的时候, 在计算了来自 input 表达式的输入之后, 是否可以尝试 Peel
// (这里输入的 rows 可能在 `evalEncoding` 的时候已经 Peel 过一轮了).
// 这里需要 deterministic_ 为 true, 且所有的 input 都是 Peelable 的, 然后才可以抽取公共
// 字典.
bool tryPeelArgs = deterministic_ ? true : false;
bool defaultNulls = vectorFunction_->isDefaultNullBehavior();

// Tracks what subset of rows shall un-evaluated inputs and current expression
// evaluates. Initially points to rows.
//
// 套一层本次执行的 Selector, 可以分清本次和增量的 Selector.
MutableRemainingRows remainingRows(rows, context);
if (defaultNulls) {
// 以 default null 方式展开子表达式(args), 这里需要按照 Selector 增量展开 input 设置到
// this->inputValues_ 里头.
// defaultNull 下, 一个为 null, 结果为 null.
if (!evalArgsDefaultNulls(
remainingRows,
[&](auto i) {
// 按照上一轮剩下的 Selector 去展开 input.
inputs_[i]->eval(remainingRows.rows(), context, inputValues_[i]);
// 设置是否要 peeling
tryPeelArgs =
tryPeelArgs && isPeelable(inputValues_[i]->encoding());
},
context,
result)) {
return;
}
} else {
// !defaultNulls 下, 一个为 null, 结果不一定为 Null
if (!evalArgsWithNulls(
remainingRows,
[&](auto i) {
inputs_[i]->eval(remainingRows.rows(), context, inputValues_[i]);
tryPeelArgs =
tryPeelArgs && isPeelable(inputValues_[i]->encoding());
},
context,
result)) {
return;
}
}

// 1. 如果有 tryPeel 的话, 尝试 applyFunctionWithPeeling.
// 2. 否则执行 applyFunction, 直接 apply. 里面在参数少的时候可能也会展开(即字典处理,但是不会减少输入参数)
if (!tryPeelArgs ||
!applyFunctionWithPeeling(remainingRows.rows(), context, result)) {
applyFunction(remainingRows.rows(), context, result);
}

// Write non-selected rows in remainingRows as nulls in the result if some
// rows have been skipped.
//
// 产生了新的 Null, 需要 Deselect.
if (remainingRows.hasChanged()) {
addNulls(rows, remainingRows.rows().asRange().bits(), context, result);
}
releaseInputValues(context);
}

这里我们观察到多的地方是:

  1. Null 的处理 ( evalArgsDefaultNulls, evalArgsWithNulls)
  2. Peeling 的处理 (这个我们可以放在下一 Part 介绍)

我们观察一下这里 Null 的处理,非 Default Null 的处理是最简单的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 这里不会按照 Null 来增量展开, 但是会按照 Error 来做增量展开.
template <typename EvalArg>
bool Expr::evalArgsWithNulls(
MutableRemainingRows& rows,
EvalArg evalArg,
EvalCtx& context,
VectorPtr& result) {
inputValues_.resize(inputs_.size());
for (int32_t i = 0; i < inputs_.size(); ++i) {
evalArg(i);
if (!rows.deselectErrors()) {
break;
}
}
if (!rows.rows().hasSelections()) {
releaseInputValues(context);
setAllNulls(rows.originalRows(), context, result);
return false;
}
return true;
}

这里只要处理错误,然后返回即可( deselectErrors )。

有 Null 的地方逻辑也比较简单,维护一个 MutableRemainingRows 对象,然后每次反选 Null 的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 以 default null 方式展开子表达式(args), 这里需要按照 Selector 增量展开 input 的表达式.
//
// return false 表示没有任何输入需要执行了.
template <typename EvalArg>
bool Expr::evalArgsDefaultNulls(
MutableRemainingRows& rows,
EvalArg evalArg,
EvalCtx& context,
VectorPtr& result) {
// ...
LocalDecodedVector decoded(context);
// Store pre-existing errors locally and clear them from
// 'context'. We distinguish between argument errors and
// pre-existing ones.
if (context.errors()) {
context.swapErrors(originalErrors);
}

inputValues_.resize(inputs_.size());
{
ScopedVarSetter throwErrors(
context.mutableThrowOnError(), throwArgumentErrors(context));

for (int32_t i = 0; i < inputs_.size(); ++i) {
// 展开 this->inputs_ 的 Arg
evalArg(i);
// 拿到一轮 flatNull.
const uint64_t* flatNulls = nullptr;
auto& arg = inputValues_[i];
if (arg->mayHaveNulls()) {
// default eval. mayHaveNull 的时候, 会把 null 的位置标记出来.
// 这里的 rows.rows() 是一句 filter 过一轮的, 即前面的参数标注为 null 的
// 后面也不用解析了. 这个感觉可以减轻一些比较重的地方的计算, 这块见:
// https://github.com/facebookincubator/velox/pull/3189
//
// 这个地方通过 DecodedVector 只是为了拿到 Null, 和 Peel 其实关系不大.
decoded.get()->decode(*arg, rows.rows());
flatNulls = decoded.get()->nulls();
}
// A null with no error deselects the row.
// An error adds itself to argument errors.
if (context.errors()) {
/// ...
} else if (flatNulls) {
rows.deselectNulls(flatNulls);
}

if (!rows.rows().hasSelections()) {
break;
}
}
}

mergeOrThrowArgumentErrors(
rows.rows(), originalErrors, argumentErrors, context);

// 如果所有行都被 deselect 了, 那就可以不用执行后面的逻辑了.
if (!rows.deselectErrors()) {
releaseInputValues(context);
setAllNulls(rows.originalRows(), context, result);
return false;
}

return true;
}

你会看到这里有个相对 Hack 的地方是借助了 LocalDecodedVector。我们之前没介绍过 DecodedVector 的逻辑,这玩意在官方链接 https://facebookincubator.github.io/velox/develop/dictionary-encoding.html#decodedvector 提到。Velox 的 Vector 对象可以是多层的,比如:Dict(DICT(DICT(T), 而每层都可能包含一些 Null。DecodedVector 逻辑是拍平成一层,即 DICT(DICT(DICT(Flat))) -> DICT(Flat),然后也获取真正的内层 Nulls。这里通过 DecodedVector 来拿到真正的 nulls,然后反选这些 Nulls。从而避免这些行的表达式执行。

Encoding - Peeling

Peeling 的逻辑在 evalEncoding 内部,应该是这里比较难读懂的一块逻辑,而且实际上在代码里,Peeling 分为了两部分:

  1. evalEncoding 里面处理 rows 的 Peeling
  2. evalAll 里面 applyFunctionWithPeeling,处理子表达式输入的 Peeling。

我们先简单介绍一下 Peeling, Peeling 实际上是一种 单个或者多个表达式的 DICT 剥离。举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/// Few important examples of how vectors are peeled:
/// 1. Common dictionary layers are peeled
/// Input Vectors: Dict1(Dict2(Flat1)), Dict1(Dict2(Const1)),
/// Dict1(Dict2(Dict3(Flat2)))
/// Peeled Vectors: Flat, Const1, Dict3(Flat2)
/// peel: Dict1(Dict2) => collapsed into one dictionary
///
/// 2. Common dictionary layers are peeled
/// Input Vectors: Dict1(Dict2(Flat1)), Dict1(Const1)),
/// Dict1(Dict2(Dict3(Flat2)))
/// Peeled Vectors: Dict2(Flat), Const1, Dict2(Dict3(Flat2))
/// peel: Dict1
///
/// 3. Common dictionary layers are peeled while constant is ignored
/// (since all valid rows translated via the common dictionary layers would
/// point to the same constant index)
/// Input Vectors: Dict1(Dict2(Flat1)), Const1,
/// Dict1(Dict2(Dict3(Flat2)))
/// Peeled Vectors: Flat, Const1, Dict3(Flat2)
/// peel: Dict1(Dict2) => collapsed into one dictionary

简单的说,Peel 就是把外部的东西抠出来,然后改变 ndv,用内部的 cols 来执行。希望有更小的开销。然后执行完后,结果也可能会 Cache 起来,再包装给外侧的输出。我们举几个例子:

  1. 某列 ndv 很小,返回的是一个字典,那么只要在字典上执行表达式,不用再值上执行
  2. Ndv 不小,可以缓存一下,每次做增量 eval

(不过我理解对于超大字典,而且 ndv 很大重复值很少,实际上这块可能是个负优化。)

我们简单看一下 evalEncodings 里的条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/// NOTE(mwish): 这里 Peeling 的逻辑其实比较 Hack, 实际上分为两层:
/// 1. `evalEncodings`: 这里根据输入的 rows 尝试去做 Peeling.
/// 2. `evalAll`: 这里是真正的执行逻辑, 但是这里的输入 rows 已经是 Peeling 之后的结果了.
/// 但还有一部分输入来自于子表达式的计算. 这里会根据 Peeling 的 rows 计算完子表达式结果
/// 后, 根据返回值的 rows 来决定是否再做一次 Peeling.
void Expr::evalEncodings(
const SelectivityVector& rows,
EvalCtx& context,
VectorPtr& result) {
// 如果不是 determinstic 的, 那么就不会尝试去做 Encoding 的处理. 因为 Peeling
// 本身会改变 input(可能会减少输入 size), 所以不是 determinstic 的话, 这个优化
// 本身也不合法.
if (deterministic_ && !skipFieldDependentOptimizations()) {
bool hasFlat = false;
for (auto* field : distinctFields_) {
if (isFlat(*context.getField(field->index(context)))) {
hasFlat = true;
break;
}
}

// 一旦有 Flat, 这里本身都无法 Peel, 避免一轮框架开销了.
if (!hasFlat) {
/// 真正的逻辑
}
// 公共 Encoding 匹配失败, fallback 到直接 evalWithNulls.
evalWithNulls(rows, context, result);
}

// No need to peel encoding or remove sure nulls for default null propagating
// expressions when the expression has single parent(the expression that
// reference it) and have the same distinct fields as its parent.
// The reason is because such optimizations would be redundant in that case,
// since they would have been performed identically on the parent.
//
// 如果没有distinctField(什么字段都不 Ref), 或者 Parent 和自身的 distinctField
// 一样(sameAsParent 需要 !isMultiplyReferenced_ 才有意义), 就不 Peeling.
// 这里含义其实比较简单, Parent 传来的 Vector 已经是 Peeled 了,
// Null 也已经处理了
//
// 这里本质就是父亲算过 Null / Distinct 的优化, 自己就不用做了.
bool skipFieldDependentOptimizations() const {
if (!isMultiplyReferenced_ && sameAsParentDistinctFields_) {
return true;
}
if (distinctFields_.empty()) {
return true;
}
return false;
}

这里需要:

  1. Deterministic
  2. 满足 skipFieldDependentOptimizations,这个地方逻辑比较有意思,比如和父亲输入一样,那么显然父表达式就会做了这个逻辑(呵呵万一你爹不 deterministic 呢 2333)
  3. 输入都不是 Flat。

Peeling 会把行和 Selector 换成内部 Const / 字典,所以显然,它会修改 EvalCtx, 实际逻辑如下:

1
2
3
4
5
6
7
/// Typical usage pattern for peeling includes:
/// (See Expr::applyFunctionWithPeeling() for example usage)
/// 1. peeling a set of input vectors
/// 2. converting relevant rows (top level rows and final selection rows)
/// 3. Saving the current context and setting the peel
/// 4. Applying the function or moving forward with expression eval
/// 5. wrapping the result vector with the peel

换成代码即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
VectorPtr wrappedResult;
// Attempt peeling and bound the scope of the context used for it.
//
// ContextSaver: 用来保存当前的 Context, 主要是执行 Peeling 的时候(`peelEncodings`),
// 会修改 EvalCtx 和 Row 的数量, 所以需要在执行完之后恢复.
withContextSaver([&](ContextSaver& saveContext) {
LocalSelectivityVector newRowsHolder(context);
LocalSelectivityVector finalRowsHolder(context);
LocalDecodedVector decodedHolder(context);
auto peelEncodingsResult = peelEncodings(
context,
saveContext,
rows,
decodedHolder,
newRowsHolder,
finalRowsHolder);
// 尝试给输入做 Peel, 抽取公共的 Rows, 抽取成功的话就去执行.
auto* newRows = peelEncodingsResult.newRows;
if (newRows) {
VectorPtr peeledResult;
// peelEncodings() can potentially produce an empty selectivity
// vector if all selected values we are waiting for are nulls. So,
// here we check for such a case.
if (newRows->hasSelections()) {
if (peelEncodingsResult.mayCache) {
// 对于 Constant / Dictionary Peeling, 尝试增量执行, 缓存 Eval 的结果.
evalWithMemo(*newRows, context, peeledResult);
} else {
evalWithNulls(*newRows, context, peeledResult);
}
}
// 根据 Peeling 的结果恢复外层的结果.
// 这里就相当于内层执行完了, 然后反向给外层处理.
wrappedResult = context.getPeeledEncoding()->wrap(
this->type(), context.pool(), peeledResult, rows);
}
});

// 如果能生成 newRows, 那么就可以直接返回了.
// 否则上面 ContextSaver 会恢复 ctx.
if (wrappedResult != nullptr) {
context.moveOrCopyResult(wrappedResult, rows, result);
return;
}

注意这里 ContextSaver 是在 Peel 失败的时候恢复原来的 Row 信息的。

EvalWithNulls

evalWithNulls 逻辑非常简单,当表达式是 propagateNull 的时候,deselect Null input 即可。

Dictionary Memo

我们假设场景是一个 TableScan,Parquet 可能会有 Row-Group 级别的 Dict。这个时候,每次 Scan 产生的 Dict 大概都是同一个。我们之前介绍的 CSE 是缓存一个 row 的输出,但是 Memo 则是跨 Row Vector 缓存输出。这里对输入的要求是,表达式输入必须只能有一个,而且必须是 Dictionary。

这里逻辑见:https://facebookincubator.github.io/velox/develop/expression-evaluation.html#memoizing-the-dictionaries

Wrapup

expr-eval-code