Expression Execution in Velox Part 1: Basics and UDF

Velox 最初是一个 Facebook 为 Presto 做的一个 C++ Runtime,而为了扩大这套东西的使用价值,FB 开源了这套东西,现在这套东西可以给 Spark(Gluten), Presto 这样使用。Meta 对其的定义是个单机的计算库。详见论文 (1) (2) 和之前的 blog:

这里希望介绍 Velox 的表达式计算 Expr Evaluation。可能一篇文章介绍不完,所以本文应该只尝试介绍 Expr::eval 的链路。篇幅所限,本文应该也不会直接介绍 SpecialForm 的 Expr 的代码(if, cast, try, switch),但是会涉及它们里面的一些概念,要不然这里其实很多地方没法讲下去了。

这篇博文大概内容如下:

  1. 简短介绍 Velox 这几块涉及的类型。
  2. 介绍 UDF 定义和怎么扩展到 VectorFunction 的,以及 SimpleFunction UDF 执行链路的优化
  3. 介绍 Expr 的基本执行链路

(3)会在 Part 2 中介绍。

数据类型和基本概念

这一部分的概念出自

  1. https://blog.mwish.me/2023/05/04/Type-and-Array-in-Columnar-System/#Velox
  2. https://facebookincubator.github.io/velox/develop/types.html
  3. https://facebookincubator.github.io/velox/develop/vectors.html

Type

Velox 中每个数据都有对应的类型. 列表如下:

img

此外还有一个 Function 类型。为什么 Function 需要一个类型呢?因为在执行 Lambda 的时候可能需要把函数当成一个参数传进去。

上述的类型是物理类型,这里还有 LogicalType

img

这里有个比较有趣的类型是 TypeParameter,比如说 Decimal 肯定得有两个参数 Decimal(precision, scale), 然后时间也有对应的单位或者是否 adjustToUtc。这些封装成了 TypeParameter,便于存储访问。e.g.:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/// This class represents the fixed-point numbers.
/// The parameter "precision" represents the number of digits the
/// Decimal Type can support and "scale" represents the number of digits to the
/// right of the decimal point.
template <TypeKind KIND>
class DecimalType : public ScalarType<KIND> {
public:
inline uint8_t precision() const {
return parameters_[0].longLiteral.value();
}

inline uint8_t scale() const {
return parameters_[1].longLiteral.value();
}

private:
const std::vector<TypeParameter> parameters_;
};

class ShortDecimalType : public DecimalType<TypeKind::BIGINT> {
public:
ShortDecimalType(int precision, int scale)
: DecimalType<TypeKind::BIGINT>(precision, scale) {}
};

class LongDecimalType : public DecimalType<TypeKind::HUGEINT> {
public:
LongDecimalType(int precision, int scale)
: DecimalType<TypeKind::HUGEINT>(precision, scale) {}
};

TypePtr DECIMAL(uint8_t precision, uint8_t scale);

这里还有对应的复杂类型,ARRAY, MAP, ROW (Row 可以当成是 STRUCT)。

不同于 Arrow,字典之类的并不作为一种「类型」,而是作为一种 encoding。

From Buffer to FlatVector

Velox 的 Buffer 和 Arrow 的 Buffer 类似,区别是:

  1. Arrow 的 Buffer 有 MemoryManager 和 Device,用来管理不同数据源的 Buffer;Velox 的 Buffer 是没有的。
  2. Arrow 的 Buffer 大量依赖 std::shared_ptr;Velox 的 Buffer 依赖 boost::intrusive_ptr,可能 Velox 的性能会好点
  3. 两者都能绑定 MemoryTracker 类似的东西
  4. Velox 的 Buffer 有一些工具函数管理 non-POD type

Velox 的 Vector 分成好几部分,大量依赖了这个 Buffer 类型:

img

  1. nulls_ 是一个 bits 级别的是否是 null,实现成一个 BufferPtr
  2. encoding: 关于 encoding 我还是直接贴代码为好,如下。我们暂时只会介绍复杂类型、Flat、Constant、Dictionary
  3. 元信息:Vector 上有很多标注这个 Vector 存储信息的元信息,包括并不限于:
    1. type_
    2. nullCount_
    3. distinctValueCount_
    4. 还有一些执行的时候的上下文,比如:memoDisabled_, isCodegenOutput_
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Provides an enumeration of vector encoding types.
*/
enum class Simple {
BIASED,
CONSTANT,
DICTIONARY,
FLAT,
SEQUENCE,
ROW,
MAP,
ARRAY,
LAZY,
FUNCTION
};

那么下面我们应该介绍 Vector 的实现了。它有一个大概如下的继承链:

1
2
3
4
5
6
7
8
BaseVector ( 包含 nulls, type, encoding 和元信息 )
- LazyVector
- FunctionVector
- RowVector
- SimpleVector<T> 基本的 vector 类型, 包含元素 T 类型和一些保障, 但是没有 element buffer.也有一些 statistics.
- DictionaryVector<T>: 字典 Vector, 成员见下代码
- FlatVector<T>: 常见布局的平坦元素 Vector, 用一个 BufferPtr 来承担 Values. 这里对 String 的处理比较有意思, 见下代码
- ConstantVector<T>: 对于 ConstantVector 来说, 它的语义是 "常量的 Null 或者是一个 constant value", 因此它不会用到父级别的 nulls Buffer. 它指向 Null 或者 Vector 中的某个位置

DictionaryVector:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename T>
class DictionaryVector : public SimpleVector<T> {
BufferPtr indices_;
const vector_size_t* rawIndices_ = nullptr;

VectorPtr dictionaryValues_;
// Caches 'dictionaryValues_.get()' if scalar type.
SimpleVector<T>* scalarDictionaryValues_ = nullptr;
// Caches 'scalarDictionaryValues_->getRawValues()' if 'dictionaryValues_'
// is a FlatVector<T>.
const T* rawDictionaryValues_ = nullptr;

// Indicates whether internal state has been set. Can also be false if there
// is an unloaded lazy vector under the encoding layers.
bool initialized_{false};
};

img

需要特殊注意的两个地方是:

  1. Velox 的 Dictionary 是可以嵌套的。即:VectorPtr -> Dictionary -> VectorPtr -> Dictionary -> ... 这样套娃下去
  2. Velox 的 Dictionary 可能包含 NULL,这里图上没有 Sample,但是 Dictionary 的 VectorPtr 可以包含 NULL
  3. Dictionary 通过 wrappedVector wrappedIndex 返回内部的 BaseVector 和 index

FlatVector:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// FlatVector is marked final to allow for inlining on virtual methods called
// on a pointer that has the static type FlatVector<T>; this can be a
// significant performance win when these methods are called in loops.
template <typename T>
class FlatVector final : public SimpleVector<T> {
// Contiguous values.
// If strings, these are velox::StringViews into memory held by
// 'stringBuffers_'
BufferPtr values_;

// Caches 'values->as<T>()'
T* rawValues_;

// If T is velox::StringView, the referenced is held by
// one of these.
std::vector<BufferPtr> stringBuffers_;

// Used by 'acquireSharedStringBuffers()' to fast check if a buffer to share
// has already been referenced by 'stringBuffers_'.
//
// NOTE: we need to ensure 'stringBuffers_' and 'stringBufferSet_' are always
// consistent.
folly::F14FastSet<const Buffer*> stringBufferSet_;
};

FlatVector 通过 BufferPtr 来存 value,然后弄了个 rawValues_ 作为缓存。对于 slice,它允许用一个 sliceBuffer 拿到 bias 的 Buffer (arrow 有个额外的机制是 offset,即 base + offset 等于真实的数据,没有走 sliceBuffer 这样额外的机制。)

还有一个比较好玩的就是 string 的处理。stringBuffers_stringBufferSet_ 是为了给 string 有关的东西弄所有权使用的。与 Arrow 不同,Velox 支持 out-of-order write。所以这里会需要一些额外机制处理 BufferPtr 的所有权。

ConstantVector:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename T>
class ConstantVector final : public SimpleVector<T> {
// 'valueVector_' element 'index_' represents a complex constant
// value. 'valueVector_' is nullptr if the constant is scalar.
VectorPtr valueVector_;
// The index of the represented value in 'valueVector_'.
vector_size_t index_ = 0;
// Holds the memory for backing non-inlined values represented by StringView.
BufferPtr stringBuffer_;
T value_;
bool isNull_ = false;
bool initialized_{false};

// This must be at end to avoid memory corruption.
std::conditional_t<can_simd, xsimd::batch<T>, char> valueBuffer_;
};

ConstantVector 指向:

  1. valueVector_ 中的一个位置. 这里内部 Vector 可以是个 Flat 或者 Lazy,我其实感觉是 LazyVector 还比较有意义,是 Flat 的话感觉直接取值就行
  2. 直接指向 T, T 中的 string 所有权在 stringBuffer_
  3. isNull_ 指向 Null

Selection and Rows

img

上述一个 RowVector 的结构。我们没有介绍 MAP 和 ARRAY,因为我们这篇文章不会用到它们。执行的时候,我们会有一个 Row 的输入。

此外,这里还会有个 SelectivityVector. 它的语义类似 NULL 但不完全相同。可以把它理解成向量化论文中的 Selection,没有被 Select / Filter 掉的数据不一定是 NULL。它的逻辑结构也是个 bits,类似 Nulls,但是(重复性的)做了一些工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// A selectivityVector is used to logically filter / select data in place.
// The goal here is to be able to pass this vector between filter stages on
// different vectors while only maintaining a single copy of state and more
// importantly not ever having to re-layout the physical data. Further the
// SelectivityVector can be used to optimize filtering by skipping elements
// that where previously filtered by another filter / column
class SelectivityVector {
private:
// The vector of bits for what is selected vs not (1 is selected).
// 这里的需求还是 bits 按照 8Byte 来对齐
std::vector<uint64_t> bits_;

// The number of leading bits used in 'bits_'.
vector_size_t size_ = 0;

// The minimum index of a selected value, if there are any selected.
vector_size_t begin_ = 0;

// One past the last selected value, if there are any selected.
vector_size_t end_ = 0;

mutable std::optional<bool> allSelected_;

friend class SelectivityIterator;
};

一般行执行的时候,输入结构可以当成下列结构:

  1. RowVector
  2. SelectivityVector

SelectivityVector 来表示 RowVector 的一些状态。

DecodedVector

关于 DecodedVector,官方描述见:https://facebookincubator.github.io/velox/develop/dictionary-encoding.html

DecodedVector 可以当作和 Dictionary / Constant 逻辑强相关:简单的说,DecodedVector 接受一个 Vector,展开成一个”最多嵌套一层” 的 Vector, Eg:

  1. Flat -> Flat
  2. DICT(DICT( … Flat(T))) -> <indices, nulls, dictionaryValues>
  3. Dict(Dict(… Constant( T )) -> <constantT, nulls, ...>

然后可以用 nulls 或者 valueAt 类似的方法访问。官方的 DecodedVector 例子比较充足,看看应该就会了。

错误处理

错误处理参考:https://facebookincubator.github.io/velox/develop/expression-evaluation.html#error-handling-in-and-or-try

这部分我们举几个例子:

1
a != 0 and b / a > 0

这个如果去做 Filter Reorder,那么 b / a > 0 的时候,实际上是会需要处理错误的。此外,系统中还有 TRY(...) 这样的表达式,如果抛出错误会需要特殊处理。

Velox 里面一定程度上用异常来处理错误,但是会有一些直接产生 exception_ptr 或者包装错误的地方,来尽量减少开销。在执行的时候可能会有一个 Vector 来存放错误的状态,记录执行期的错误上下文,方便处理。

EvalCtx

我们终于来到了这里,本来想把 EvalCtx 放在介绍后头表达式执行的地方,不过思来想去挪到前头也不错,因为 Function 执行、表达式执行都有个 EvalCtx 的上下文。放后面介绍其实不太好。

初步下一个定义:EvalCtx 是对 <RowVector, ExprSet> 这个输入和对应执行上下文的包装, 因为和 rows 有关, 所以也写了很多数据本身的属性, 比如 flatNoNulls 什么的. 我们可以先看成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class EvalCtx {
public:
memory::MemoryPool* FOLLY_NONNULL pool() const {
return execCtx_->pool();
}

// Returns the index-th column of the base row. If we have peeled off
// wrappers like dictionaries, then this provides access only to the
// peeled off fields.
const VectorPtr& getField(int32_t index) const;

VectorPtr ensureFieldLoaded(int32_t index, const SelectivityVector& rows);

void setPeeled(int32_t index, const VectorPtr& vector) {
if (peeledFields_.size() <= index) {
peeledFields_.resize(index + 1);
}
peeledFields_[index] = vector;
}

const std::vector<VectorPtr>& peeledFields() {
return peeledFields_;
}

/// Used by peelEncodings.
void saveAndReset(ContextSaver& saver, const SelectivityVector& rows);

void restore(ContextSaver& saver);

// If exceptionPtr is known to be a VeloxException use setVeloxExceptionError
// instead.
//
// 设置执行的 Error.
void setError(vector_size_t index, const std::exception_ptr& exceptionPtr);

// Similar to setError but more performant, should be used when the user knows
// for sure that exception_ptr is a velox exception.
void setVeloxExceptionError(
vector_size_t index,
const std::exception_ptr& exceptionPtr);
private:
core::ExecCtx* const FOLLY_NONNULL execCtx_;
ExprSet* FOLLY_NULLABLE const exprSet_;
const RowVector* FOLLY_NULLABLE row_;
const bool cacheEnabled_;
const uint32_t maxSharedSubexprResultsCached_;
bool inputFlatNoNulls_;

// Corresponds 1:1 to children of 'row_'. Set to an inner vector
// after removing dictionary/sequence wrappers.
std::vector<VectorPtr> peeledFields_;

// Set if peeling was successful, that is, common encodings from inputs were
// peeled off.
std::shared_ptr<PeeledEncoding> peeledEncoding_;

// True if nulls in the input vectors were pruned (removed from the current
// selectivity vector). Only possible is all expressions have default null
// behavior.
//
// nullsPruned_ 和
bool nullsPruned_{false};
// Error Handling 的上下文, 对 Try(T) 这样的肯定要处理一层, 此外 Velox 还会 Filter Reorder,
// 这里应该都会影响 Error 的处理.
bool throwOnError_{true};

// True if the current set of rows will not grow, e.g. not under and IF or OR.
bool isFinalSelection_{true};

// If isFinalSelection_ is false, the set of rows for the upper-most IF or
// OR. Used to determine the set of rows for loading lazy vectors.
const SelectivityVector* FOLLY_NULLABLE finalSelection_;

// Stores exception found during expression evaluation. Exceptions are stored
// in a opaque flat vector, which will translate to a
// std::shared_ptr<std::exception_ptr>.
ErrorVectorPtr errors_;
};

上面这里有贴代码之嫌疑了,但是这个要嘴巴介绍确实比较抽象,通俗说这块还是 「输入的行 + Selection 的包装 + 输出的错误的包装」,Peeling 这个我们之后会讲,也和这块有关。

VectorFunction and SimpleFunctionAdapter

这一节有个很好的总结:

  1. 开发文档中 SimpleFunction 的部分 https://facebookincubator.github.io/velox/develop/scalar-functions.html
  2. 官方博客关于 SimpleFunction 的介绍 https://velox-lib.io/blog/simple-functions-2

Velox 有一个接口是 VectorFunction。这个 VectorFunction 不等价于 Arrow 的 VectorFunction ( Arrow 定义了 ScalarFunction 和 VectorFunction 等,Scalar 是参数之间互相无关,例如 Add 之类的;Vector 是可能有关,例如 Sort, Agg 等)。Velox 这些 Function 都是 “VectorFunction”,其接口如下(原本这里有一些字符串逻辑的定义,比如关于 ASCII 编码的属性,本文暂时移除这些便于阅读):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Superclass for functions which are written to run on whole vectors.
//
// VectorFunction 的实现, 它不同于 arrow 的 Scalar 和 Vector. 任何 arrow 中的
// Scalar 对他来说都是 Vector function.
class VectorFunction {
public:
virtual ~VectorFunction() = default;
virtual void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args, // Not using const ref so we can reuse args
const TypePtr& outputType,
EvalCtx& context,
VectorPtr& result) const = 0;

virtual bool isDeterministic() const {
return true;
}

// NULL 输入 -> 输出 NULL 就是 defaultNull behavior.
virtual bool isDefaultNullBehavior() const {
return true;
}

// 给 ML 之类的负载用,输入都是 flat + NoNull 可以定义一个 fastPath
virtual bool supportsFlatNoNullsFastPath() const {
return false;
}
};

SimpleFunctionAdapter 则是 Velox 内部的一个工具,用户写了一个 C++ class 后,可以生成一个上述的 VectorFunction,便于快速执行。

可选的优化参数

这部分可以参考:

  1. https://facebookincubator.github.io/velox/develop/scalar-functions.html
  2. https://facebookincubator.github.io/velox/develop/view-and-writer-types.html

用户可以写简单的函数,然后便于执行,最简单例子如下 ( velox/functions/prestosql/Arithmetic.h )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template <typename T>
struct CeilFunction {
template <typename TOutput, typename TInput = TOutput>
FOLLY_ALWAYS_INLINE void call(TOutput& result, const TInput& a) {
if constexpr (std::is_integral_v<TInput>) {
result = a;
} else {
result = ceil(a);
}
}
};

template <typename T>
struct FloorFunction {
template <typename TOutput, typename TInput = TOutput>
FOLLY_ALWAYS_INLINE void call(TOutput& result, const TInput& a) {
if constexpr (std::is_integral_v<TInput>) {
result = a;
} else {
result = floor(a);
}
}
};

String 相关的大概知道文档里 ascii 有关的、生命周期优化就行,比如说:

  1. callAscii 在输入发现自己全是 ascii 的时候可以用,作为 ascii fast path,比如求 utf8 char count 或者 truncate,如果都是 ASCII 那快速处理就完事了,不然还要扫一下 codepoint。
  2. Reuse input: Velox 的字符串列会有 BufferPtr,作为侵入式 shared_ptr 拿到列的字符串的所有权。正常的话一个 str 有关的算子会走 copy,然后开一块新的空间存放所有权。但比如 substr 或者 truncate 就可以在输入列 copy 一份 BufferPtr (而不是数据),来拿到对应的数据,避免一轮开销(zero-copy)

我们这里主要关注 NULL 相关的几个逻辑:

  1. 函数 callNullable: 当定义这个函数的时候,行为不是 defaultNull,即不是输入 Null -> 输出 Null 的形式了。
  2. 函数 callNullFree : 当定义这个函数的时候,在输入没有 NULL 的时候,定义了一个 FastPath。这个一般是给嵌套类型(Array / Map ) 使用的。

思考:Arrow 的一些执行,比如 Add 执行的时候,会分开 Null 和 Add。即 Int32Array + Int32Array 的时候,做一轮 bitAnd,再直接 ignoreNull 做一轮整数加法。这种估计最好有一些渐进式实现,比如对 Selector 返回 RLE/Bitpack 的模式,如果是连续的 Null / Non-Null (RLE Mode),就执行 / 不执行。

那么下面介绍具体的执行。

Apply 执行的路径

这里执行的路径其实比较简单,但是因为有一些依赖,加上他们写的模版代码还不咋好看,其实看起来还是蛮恶心的。

  1. 尝试找到 output vector,或者进行 input reuse。Input reuse 是个特定场景还比较有用的优化,比如 (a + 1) * 2 + b。这个东西编译成表达式(不考虑 codegen 连接整个表达式的话),可能 a 作为输入列,+1 执行一下 Function, *2 又是一次,+b 又是一次,就很离谱。虽然算下来可能最后 profile 的时候,大部分计算瓶颈真不是这里,而且内存可以池化,但是这块可能会把 CPU 开销弄出一些内存访问开销。这里比较好的方式就是 codegen 。而 input reuse 就是用输入看看能不能一起用在输出上(即读写同一个 Vector)
  2. 根据函数定义和输入,判断 NULL 的分发。这里如果是 defaultNull 的(即没有定义 callNullable),下发给 VectorFunction 的输入甚至不会含 NULL(这里的不包含指的是 Selection 选中的不包含,没选中的就不管了)。
  3. 分发 Decode,比如输入都是 VectorPtr,但是 Vector 会有不同的 encoding。这里最好的方式是根据 encoding 的类型去分发执行,比如两个都是 Constant 就怎么执行(实际上由于 Peeling 这种情况不多);一个 Constant 一个 Vector 怎么执行;两个都是 Dictionary 怎么执行。这里见下图。当然这个情况下执行可能参数多的时候展开就很废。Velox 限定了展开条件,输入小于 2 才去做展开

img

上面贴图执行相关的参数是:

1
2
3
/// https://velox-lib.io/blog/simple-functions-1
/// 参考: `To avoid code size blowing` 这一段.
static constexpr bool specializeForAllEncodings = FUNC::num_args <= 3;

原理介绍完了,看代码吧(我删掉了一些字符串和初始化有关的代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
EvalCtx& context,
VectorPtr& result/*result 可以是 nullptr*/) const override {

auto* reusableResult = &result;
// If result is null, check if one of the arguments can be re-used for
// storing the result. This is possible if all the following conditions are
// met:
// - result type is a fixed-width type,
// - function doesn't produce nulls,
// - an argument has the same type as result,
// - the argument has flat encoding,
// - the argument is singly-referenced and has singly-referenced values
// and nulls buffers.
//
// 这个要输出不是 Null 才行,为什么呢? 这块在计算的时候, 对于 producing Null
// 的结果, 会 clearAllNulls:
// 见 https://github.com/facebookincubator/velox/pull/1521
bool isResultReused = false;
if constexpr (
!FUNC::can_produce_null_output && !FUNC::udf_has_callNullFree &&
return_type_traits::isPrimitiveType &&
return_type_traits::isFixedWidth) {
// 输入是 NULL 的时候才能使用.
if (!reusableResult->get())
if (auto* arg = findReusableArg<0>(args)) {
reusableResult = arg;
isResultReused = true;
}
}
}

ApplyContext applyContext{
&rows, outputType, context, *reusableResult, isResultReused};

// If this UDF can take the fast path iteration, we set all active rows as
// non-nulls in the result vector. The assumption is that the majority of
// rows will return non-null values (and hence won't have to touch the
// null buffer during iteration).

if constexpr (fastPathIteration) {
// If result is resuing one of the inputs we do not clear nulls, instead
// we do that after the the input is read. It is safe because reuse only
// happens when the function does not generate null.
//
// 如果 result 是重用的,那么就不用 clearNulls 了.
if (!isResultReused) {
(*reusableResult)->clearNulls(rows);
}
}

// 系统的 DecodedVector
std::vector<std::optional<LocalDecodedVector>> decoded;
// 允许执行并且参数数量小于一定限制的话, 就根据类型去 unpack.
// 见: https://velox-lib.io/blog/simple-functions-1
// 参考: `To avoid code size blowing` 这一段.
if (allPrimitiveArgsFlatConstant(args)) {
if constexpr (
allArgsFlatConstantFastPathEligible() && specializeForAllEncodings) {
// 根据类型做展开然后分发
unpackSpecializeForAllEncodings<0>(applyContext, args);
} else {
decoded.resize(args.size());
unpack<0, true>(applyContext, decoded, args);
}
} else {
decoded.resize(args.size());
unpack<0, false>(applyContext, decoded, args);
}

if constexpr (fastPathIteration) {
// If result is resued and function is is_default_null_behavior then we do
// not need to clear nulls.
if constexpr (!FUNC::is_default_null_behavior) {
if (isResultReused) {
(*reusableResult)->clearNulls(rows);
}
}
}

if (isResultReused) {
result = std::move(*reusableResult);
}
}

这里 Decode 一层我们关注 unpack* 函数,以 flat-no-null 执行为例(即输入 Vector 都是 flat / constant,且函数参数小于3个). 这里会把所有输入展开成 VectorReader,然后最内层的时候执行 iterate。这里我们会看到,Vector 被转换成了 ConstantVectorReaderFlatVectorReader,我们可以与后面的代码进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// This is called only when we know that all args are flat or constant and are
// eligible for the optimization and the optimization is enabled.
template <int32_t POSITION, typename... TReader>
void unpackSpecializeForAllEncodings(
ApplyContext& applyContext,
const std::vector<VectorPtr>& rawArgs,
TReader&... readers) const {
if constexpr (POSITION == FUNC::num_args) {
iterate(applyContext, readers...);
} else {
auto& arg = rawArgs[POSITION];
using type =
typename VectorExec::template resolver<arg_at<POSITION>>::in_type;
if (arg->isConstantEncoding()) {
// 展开成 constant reader, 然后继续 delegate 下去
auto reader = ConstantVectorReader<arg_at<POSITION>>(
*(arg->asUnchecked<ConstantVector<type>>()));
unpackSpecializeForAllEncodings<POSITION + 1>(
applyContext, rawArgs, readers..., reader);

} else {
DCHECK(arg->isFlatEncoding());
// Should be flat if not constant.
auto reader = FlatVectorReader<arg_at<POSITION>>(
*arg->asUnchecked<FlatVector<type>>());
unpackSpecializeForAllEncodings<POSITION + 1>(
applyContext, rawArgs, readers..., reader);
}
}
}

iterate 的代码很长,我们贴一下:

这里分为好几步:

  1. 判断输入是否有 callNullFree,有的话,传给 VectorFunction 的参数可能含 Null。需要做一轮 fast check
  2. 按需求循环展开 Readers 的 Null 信息(和 ASCII 之类的信息),然后调用函数

这个函数我看几遍都觉得丑,刚开始看的时候觉得是我水平的问题,现在咋看还是丑,妈的就是它写的丑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// 具体执行的函数,手握所有的 ColumnReader. 然后去生成(行式)执行的代码.
template <typename... TReader>
void iterate(ApplyContext& applyContext, TReader&... readers) const {
// Null 的行为这里涉及好几个定义:
// 1. `callNullable`: 没有定义这个函数为 default, 即输入为 NULL -> 输出为 NULL.
// callNullable 处理
// 2. `callNullFree`: 定义了这个函数, 即没有任何 NULL 的时候进行快速 path.
//
// If udf_has_callNullFree is true compute mayHaveNullsRecursive.
// 开洞, 不过这玩意好像没啥意思, 感觉有的 NULL 在 Expr 执行的时候甚至不会分发下来
if constexpr (FUNC::udf_has_callNullFree) {
(
[&]() {
readers.setChildrenMayHaveNulls();
applyContext.mayHaveNullsRecursive |=
readers.mayHaveNullsRecursive();
}(),
...);
}

// If is_default_contains_nulls_behavior we return null if the inputs
// contain any nulls.
// If !is_default_contains_nulls_behavior we don't invoke callNullFree
// if the inputs contain any nulls, but rather invoke call or
// callNullable as usual.
//
// 1. defaultNull: Expr 系统不会传 Null 下来
// 2. has_callNullFree: 有 nullFree 的函数的定义
// 3. mayHaveNullsRecursive: 有 Null 啊,得执行了
//
// 这里是: "Expr 系统不会传 Null 下来" || ("有 nullFree 的函数的定义" && "输入不包含任何 null")
bool callNullFree = FUNC::is_default_contains_nulls_behavior ||
(FUNC::udf_has_callNullFree && !applyContext.mayHaveNullsRecursive);

// Compute allNotNull.
// 计算是否所有输入都是 NotNull, 和上面 callNullFree 有一定重合部分.
bool allNotNull;
if constexpr (FUNC::is_default_null_behavior) {
allNotNull = true;
} else {
allNotNull = (!readers.mayHaveNulls() && ...);
}

// Iterate the rows.
if constexpr (fastPathIteration) {
uint64_t* nullBuffer = nullptr;
// 干脆叫 getRawResultData 吧,你们起名字没人 review 吗?
auto getRawData = [&]() {
if constexpr (return_type_traits::typeKind == TypeKind::BOOLEAN) {
return applyContext.result->mutableRawValues();

} else {
return applyContext.resultWriter.data_;
}
};

auto* data = getRawData();
// 写入单个 Result.
// row: rowId
// notNull: out isNull
// out: 输出的类型.
auto writeResult = [&applyContext, &nullBuffer, &data](
auto row, bool notNull, auto out) INLINE_LAMBDA {
// For fast path iteration, all active rows were already set as
// non-null beforehand, so we only need to update the null buffer if
// the function returned null (which is not the common case).
if (notNull) {
if constexpr (return_type_traits::typeKind == TypeKind::BOOLEAN) {
bits::setBit(data, row, out);
} else {
data[row] = out;
}
} else {
if (!nullBuffer) {
nullBuffer = applyContext.result->mutableRawNulls();
}
bits::setNull(nullBuffer, row);
}
};
if (callNullFree) {
if (applyContext.mayHaveNullsRecursive) {
applyContext.applyToSelectedNoThrow([&](auto row) INLINE_LAMBDA {
typename return_type_traits::NativeType out{};
auto containsNull = (readers.containsNull(row) || ...);
bool notNull;
if (containsNull) {
// Result is NULL because the input contains NULL.
notNull = false;
} else {
notNull = doApplyNullFree<0>(row, out, readers...);
}

writeResult(row, notNull, out);
});
} else {
// 没有 Null, 按照 Selector 执行.
applyContext.applyToSelectedNoThrow([&](auto row) INLINE_LAMBDA {
typename return_type_traits::NativeType out{};
bool notNull = doApplyNullFree<0>(row, out, readers...);

writeResult(row, notNull, out);
});
}
} else if (allNotNull) {
// ... 下面都是面条代码,别看了,逻辑差不多的。
}
}

简单介绍一下 doApply 的逻辑,就是展开可变参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
template <
size_t POSITION,
typename... Values,
std::enable_if_t<
POSITION == FUNC::num_args && FUNC::is_default_null_behavior,
int32_t> = 0>
FOLLY_ALWAYS_INLINE bool
doApply(size_t /*idx*/, T& target, const Values&... values) const {
return (*fn_).call(target, values...);
}

// For NOT default null behavior, terminate with UDFHolder::callNullable.
template <
size_t POSITION,
typename... Values,
std::enable_if_t<
POSITION == FUNC::num_args && !FUNC::is_default_null_behavior,
int32_t> = 0>
FOLLY_ALWAYS_INLINE bool
doApply(size_t /*idx*/, T& target, const Values*... values) const {
return (*fn_).callNullable(target, values...);
}

上面我们过的是参数小于3的时候的 unpack,下面过一下参数多的时候,这里会走 DecodedVector 相关的 Reader 来读,这里关注下面的 LocalDecodedVector 代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 不用类型分发, 只是用 DecodedVector 来处理这块的逻辑,可能有一定的性能损失.
template <
int32_t POSITION,
bool allPrimitiveArgsFlatConstant,
typename... TReader>
void unpack(
ApplyContext& applyContext,
std::vector<std::optional<LocalDecodedVector>>& decodedArgs,
const std::vector<VectorPtr>& rawArgs,
TReader&... readers) const {
if constexpr (POSITION == FUNC::num_args) {
iterate(applyContext, readers...);
} else if constexpr (isVariadicType<arg_at<POSITION>>::value) {
// This should already be statically checked by the UDFHolder used to
// wrap the simple function, but checking again here just in case.
static_assert(
POSITION == FUNC::num_args - 1,
"Variadic args can only be used as the last argument to a function.");

for (auto i = POSITION; i < rawArgs.size(); ++i) {
decodedArgs[i] = LocalDecodedVector(
applyContext.context, *rawArgs[i], *applyContext.rows);
}
auto variadicReader =
VectorReader<arg_at<POSITION>>(decodedArgs, POSITION);
iterate(applyContext, readers..., variadicReader);
} else {
// Use ConstantFlatVectorReader as optimization when applicable.
if constexpr (
allPrimitiveArgsFlatConstant &&
isArgFlatConstantFastPathEligible<POSITION>) {
using value_t =
typename ConstantFlatVectorReader<arg_at<POSITION>>::exec_in_t;
auto& arg = rawArgs[POSITION];
auto reader = arg->encoding() == VectorEncoding::Simple::FLAT
? ConstantFlatVectorReader<arg_at<POSITION>>(
static_cast<FlatVector<value_t>*>(arg.get()))
: ConstantFlatVectorReader<arg_at<POSITION>>(
static_cast<ConstantVector<value_t>*>(arg.get()));

unpack<POSITION + 1, allPrimitiveArgsFlatConstant>(
applyContext, decodedArgs, rawArgs, readers..., reader);
} else {
decodedArgs[POSITION] = LocalDecodedVector(
applyContext.context, *rawArgs[POSITION], *applyContext.rows);

auto* oneUnpacked = decodedArgs.at(POSITION).value().get();
auto reader = VectorReader<arg_at<POSITION>>(oneUnpacked);
unpack<POSITION + 1, allPrimitiveArgsFlatConstant>(
applyContext, decodedArgs, rawArgs, readers..., reader);
}
}
}

References

  1. P. Pedreira, et al., Velox: Meta’s Unified Execution Engine, in VLDB, 2022
  2. The Composable Data Management System Manifesto (P. Pedreira, et al., VLDB 2023)
  3. Intro to Velox: https://blog.mwish.me/2022/11/13/Introduction-to-velox/