Arrow Compute Scalar Function: Framework

前面的文章自己最近一细看发现还全是槽点:https://blog.mwish.me/2023/05/27/Arrow-Compute/ 怎么特么的就过去一年了,时间过的也太快了=_=。第一次过的时候我们大概介绍了一下:

  • Compute 模块大概的逻辑,比如 Function Kernel 等逻辑的划分
  • 外部 Call Compute 的基本 stack

基本上之前是自上而下的介绍的,下面我们自下而上的介绍一下这里具体的内容。(这篇文章写完之后,回到开头,感觉我应该可以翻翻一些 type system 之类的入门材料了,哎,时间少活多想看的东西多,不知道该高兴还是该烦恼)

一个 “函数” 的多个实现

  • 定义的 UDF “add” —> 这种可能需要依赖 Function 的 Scope 解决,自己定义的 name-lookup 完之后再去寻找
  • add(1, 2)add(1.2, 2.4),这里上层都是 Add,但是实际上下层调用的都是不同的实现,这里就要涉及一个根据类型分发实现的问题

关于下面的 “分发实现”,方案大概类似,我们分类讨论一下:

这边其实区别还是,因为:

  • PG 这类引擎的计算和自己执行可能是直接绑定的,执行引擎可能不需要额外的分发逻辑?
  • Velox/Arrow 没有一个 Query Compiler,所以在计算的时候动态绑定一些类型相关的判定和处理,举个不恰当的比方,这个感觉有点像 x86 汇编和 micro-arch,前面会产生自己可以做的优化,到通用的 substrait 之类的通用的抽象,然后 Velox/Arrow 计算层再抽出一些自己的东西和特异的优化。不知道 Arrow Datafusion 那边是怎么搞得,或许可以看看。

那么,在 Arrow 系统中,我们回到之前的文章:

  • “name” -> Function,在一个 scope 内,一个 name 可能可以映射到一个 Function 上
  • 多个接受不同类型的实现:一个 Function 下面会有多个 Kernel

这里有个显而易见的点是,查询或者计算层也需要某种程度上的对输入来匹配对应的输入,然后拿到对应的输出类型。这里就是说,Kernel 可能会有对应的 Signature,包含了 <输入类型(可能是变参), 输出> 等。

好,那我们再回顾上一节中的结构:

Function: 由一个或者数个 Kernel 组成,每个 Kernel 有操作和对应的输入/输出类型,Function 按照 Function name 注册在 FunctionRegistry 中. 在类型匹配的时候,Function 支持「挑选最佳的匹配」,并且,Function 支持 implicit cast,将一些类型完成对应的转型.

这几个结构是不是其实清晰了很多。在 Dispatch 的时候,会有一些类型分发的逻辑。Arrow Compute 虽然支持自己转型(尝试对所有输入进行 type promote),但只能说这块做的很简陋

函数的类型

无论是 PostgreSQL ,还是 Velox,对于函数都有一定的区分,Velox 函数会有自己的元信息,在执行的时候通过框架层统一绑定了 UDF 的处理。它分为:

  • SimpleFunction: 我们之前花比较大篇幅介绍的内容
  • VectorFunction: 一批输入 -> 一批输出,可以访问自身的 meta,然后也可以用来实现 lambda function
  • Aggregate Function: Agg 对应的处理

这里各种功能还是比较复杂的,pg 也区分了 Agg 呀,window 呀之类的处理,感觉这里是一个这样的逻辑:尝试用几种抽象来描述函数,这些有 Vector, Scalar, Agg 用的函数之类的内容。相信这块其实很好

此外,需要注意,函数是一种 Row -> Row 的映射。Arrow 里面区分了 Function 的类型,但比较古怪的一点是,它把 Sort 也搞成了一种 Vector Function,在我看来还是蛮怪的

UDF / Table Function / … ?

Spark 之类的地方都会有这些东西的定义,我们只是贴出来

https://cloud.google.com/bigquery/docs/table-functions?hl=zh-cn

Special Function?

对于 CaseWhen, if/else 之类的函数,相当于处理逻辑是「类型没那么大关系」(有联系)且内置的,各家实现是不一样的:

  • PostgreSQL: 在 exec 的时候直接处理, 这个地方没有走 fmgr:https://github.com/postgres/postgres/blob/8fea1bd5411b793697a4c9087c403887e050c4ac/src/backend/executor/execExpr.c#L1723
  • Velox: velox 定义了 SpecialFunction 来解决这些问题, switch, cast 之类的都被分发到 SpecialFunction 而不是通用框架下,来解决这一问题
  • Arrow: Arrow 目前 switch 之类的实现比较简陋,靠手工编码了一些「泛型」的匹配规则(即任意输入 -> 输出的匹配,我们之后可以看到代码。)。此外,有一些函数被注册为 “MetaFunction”,比如 “cast”,这算是一种比较 naive 的 special function。

其实不考虑 PostgreSQL 的设计,这里有一点问题是,如果一些执行流程(比如列式 Selector)不是 day 1 就开始考虑的话,这里的后头的实现就会给前面的实现填坑

Kernel 的逻辑

函数的计算要走到 Kernel 里面,这里也是具体实现函数逻辑的地方。我们会和 Velox Expression 计算一样介绍。Arrow 这块感觉作者其实 C++ 功底要好一些,但是完成度比 Velox 低。感觉还是这块本身没有做的那么好加上用户比较少导致的。我想这种原因一般都是正反馈的吧,(在热门领域中)功能完成度高,那么用户就会多,反过来 push 功能更多的完成。

不过话又说回来,感觉这块代码也是大家比较频繁会做的,虽然有 Velox 之类的插件化计算引擎,但我个人觉得至少在今年(2024)年,这块只是对于 developer 来说是一个 “趋势”,很多关键逻辑还是要懂哥自己去手写或者改的,不过这块感觉也都是 open problem 了,感觉相对于未知的秘密,很多都是工程实现的问题(虽然我觉得 Velox 有些代码是真 jb 恶心,不过感觉人家该做的事情蛮多也都做了)

Kernel 的选择和 Binding

在执行的时候,这里它实现还是比较花哨的,不仅需要考虑我们之前提到的内容(即各种函数的匹配和对应的实现),还有不同 simd-level 下的实现,显得这里面花头比较多

签名: KernelSignature

KernelSignature 包含 , OutputType>,注意这里也包含 va-args 或者无参数的处理。这里实际上感觉也是个类型系统和分类的问题了,感觉在程序或者 pl 业界应该有比较成熟的解决方案,我虽然不是很懂,但还是死记硬背一下,之后遇到好的再想想这里抽象能不能改更好。

  • Inputs:
    • va_arg: 最后一个 input match 输入列表里面剩下的 input,允许参数数量和 input list 不一样,否则必须完全相同才 match
    • Input: 可以是 any (match 任意类型), exact ( 完全一致的类型 ), typematcher ( 用手工编码的类型匹配 )
      • TypeMatcher 允许手动匹配 Match 单个类型
  • Output:
    • 允许写死(FIXED) 或者根据输入计算 (COMPUTED)

Dispatch 的逻辑

我们先看一个简单的 Dispatch, 这里其实设计上我们之前说过,和类型系统相关,而且某种程度上应该是查询编译的时候处理好的内容,这里可能只是一层比较简单的逻辑:你会发现这里在匹配的时候,尝试抽取了公共的类型(尝试类型提升),在完成后,再去 DispatchExact。当然不同 Kernel 可能会有自己的 Dispatch 逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct CompareFunction : ScalarFunction {
using ScalarFunction::ScalarFunction;

Result<const Kernel*> DispatchBest(std::vector<TypeHolder>* types) const override {
RETURN_NOT_OK(CheckArity(types->size()));
if (HasDecimal(*types)) {
RETURN_NOT_OK(CastBinaryDecimalArgs(DecimalPromotion::kAdd, types));
}

using arrow::compute::detail::DispatchExactImpl;
if (auto kernel = DispatchExactImpl(this, *types)) return kernel;

EnsureDictionaryDecoded(types);
ReplaceNullWithOtherType(types);

if (auto type = CommonNumeric(*types)) {
ReplaceTypes(type, types);
} else if (auto type = CommonTemporal(types->data(), types->size())) {
ReplaceTypes(type, types);
} else if (auto type = CommonBinary(types->data(), types->size())) {
ReplaceTypes(type, types);
}

if (auto kernel = DispatchExactImpl(this, *types)) return kernel;
return arrow::compute::detail::NoMatchingKernel(this, *types);
}
};

这里大概逻辑就是抽取公共类型然后做类型提升,然后预期执行的时候做 Cast

WIP: Dispatch && Decimal?

时间戳和 Decimal 都会有相同的处理逻辑,Decimal 的问题是多个 Decimal32,Decimal64 不一定是同一个类型,可能要考虑这种不同的 Decimal 的处理。感觉这块有上层编译处理会方便很多,看最近一些讨论,现在 Decimal 计算还是个坑

Binding

我们在之前介绍过 Binding 的逻辑,可以看到:https://blog.mwish.me/2023/07/06/arrow-expression/#Bind

这里这段日子虽然进了 patch,但大概的逻辑还是一样的,即抽取公共表达式,然后再在 function 中尝试 Binding。这里需要注意一个地方,这里某个输入的来源可能是别的表达式(这很好假设),这里最终类型靠 bind 完 input 之后的 OutputType 去 Resolve 得到。

另一点是,Cast 这里会在 Bind 的时候,插入到表达式树中,这点差不多逻辑如下:

  • 尝试对输入 DispatchExact
  • 尝试抽取公共类型,去 DispatchBest ( 注意我感觉这里面有的是外层 Bind 做的,有的是内层 DispatchBest 做的)
  • 最后输入如果和这个不一样,尝试和输入之间去插入一层 Cast

为了给 Call 插入内容,所以这里会插入一层 KernelStateKernel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct Call {
std::string function_name;
std::vector<Expression> arguments;
std::shared_ptr<FunctionOptions> options;
// Cached hash value
size_t hash;

// post-Bind properties:
std::shared_ptr<Function> function;
const Kernel* kernel = NULLPTR;
std::shared_ptr<KernelState> kernel_state;
TypeHolder type;

void ComputeHash();
};

执行和 Context

在 Bind 之后,Scalar 表达式可以交给 ExecuteScalarExpression 来执行。

Kernel 的执行由 KernelExecutorFunctionExecutor 来处理. Kernel 本身是无状态的,外部( 执行上下文保留 KernelState)保留这个状态,我们往前翻几行就可以在 Call 的成员中找到这个 KernelState。这部分还是通用的表达式信息,即别的类型表达式也可以带上这样的信息,每个 Kernel 可以负责:生成 KernelState,初始化,执行,finalize

FunctionExecutor 在上层去驱动 KernelExecutor 的执行,因为 Function 本身类型的不同(我们之前说的 Agg 之类的),所以 Kernel Executor 的类型也会不一样,但是也有一些逻辑,比如申请内存、Null Handling 是可以 unify 的。这里我们可以回顾 Velox 的表达式计算,它提供了 UDF 包装,然后执行框架抽取了 Null 处理之类的公共逻辑,来加速执行。Arrow 这块做的没有那么开放,它在 KernelExecutorImplScalar 的 Kernel 执行中抽取了一些公共逻辑,但是完成度远远没有 Velox 那么高,大概只有 Null 和 Buffer 分配的一些逻辑在 Scalar 处理中(想必也是因为这部分逻辑不支持 Executor)

这里框架会尝试:

  1. 判断框架信息,尝试准备输出,输出包含 data 和 Null
    1. 对于 data,这里如果数据类型是 Fixed-Sized 的(StringView, 普通类型等)可以预先申请一部分空间
    2. 对于 Null,可能可以申请空间,或者允许内层自己判断
  2. 执行的时候,对每个输入 batch,初始化完上述信息后,调用内层的执行函数(这里就可以参考之前的逻辑了: https://blog.mwish.me/2023/05/27/Arrow-Compute/

MetaFunction 的执行

这里是个比较怪异的地方,MetaFunction 会自己分发执行给别的 function (用 CallFunction),更古怪的是,cast 本身有一些 Kernel,但是没有 Register 给外面,而是分发给 MetaFunction 来做执行了。反正我觉得 MetaFunction

Wrap up

  • FunctionRegistry 中,lookup by name & scope,然后检查 GetFunction 函数名符合预期
  • 对捞出来的函数判断类型(函数类型,即 Vector, Scalar, …),创建对应的 Executor
  • 根据类型,尝试拿到对应的 kernel,进行 binding,然后尝试做一些简陋的优化
  • context 的初始化。
  • 对于输入的类型,抽出 Batch,然后尝试在框架层处理 Null、申请 Batch 内存
  • 执行和输出