Arrow Compute Scalar Function: Framework
前面的文章自己最近一细看发现还全是槽点:https://blog.mwish.me/2023/05/27/Arrow-Compute/ 怎么特么的就过去一年了,时间过的也太快了=_=。第一次过的时候我们大概介绍了一下:
- Compute 模块大概的逻辑,比如 Function Kernel 等逻辑的划分
- 外部 Call Compute 的基本 stack
基本上之前是自上而下的介绍的,下面我们自下而上的介绍一下这里具体的内容。(这篇文章写完之后,回到开头,感觉我应该可以翻翻一些 type system 之类的入门材料了,哎,时间少活多想看的东西多,不知道该高兴还是该烦恼)
一个 “函数” 的多个实现
- 定义的 UDF “add” —> 这种可能需要依赖 Function 的 Scope 解决,自己定义的 name-lookup 完之后再去寻找
add(1, 2)
和add(1.2, 2.4)
,这里上层都是 Add,但是实际上下层调用的都是不同的实现,这里就要涉及一个根据类型分发实现的问题
关于下面的 “分发实现”,方案大概类似,我们分类讨论一下:
- PostgreSQL 这类绑定,在查询编译的时候,把外部,带查询编译的库,会实现不同版本的函数,然后用不同的函数名处理,这里也会完成一些 cast 的流程,最后走 Function 之类的处理 jit codegen 之类的
- 例如:https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_operator.dat 和 https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_cast.dat 绑定不同类型相关的操作
- Fmgr 模块处理 function 相关的执行,也会有一些注册、执行相关的逻辑 https://github.com/postgres/postgres/tree/master/src/backend/utils/fmgr
- Velox 这边提供了
FunctionRegistry
,然后对不同名字的函数,都会有一个对应的 Registry. Cast 可能会预期上层来插入 - Arrow 这里可能会接受 Runtime 分发来的类型,所以自己可能要提供不同的 Kernel,处理不同类型的函数。但是同一种 Kernel 可以处理 `` 这样多种的输入类型
这边其实区别还是,因为:
- PG 这类引擎的计算和自己执行可能是直接绑定的,执行引擎可能不需要额外的分发逻辑?
- Velox/Arrow 没有一个 Query Compiler,所以在计算的时候动态绑定一些类型相关的判定和处理,举个不恰当的比方,这个感觉有点像 x86 汇编和 micro-arch,前面会产生自己可以做的优化,到通用的 substrait 之类的通用的抽象,然后 Velox/Arrow 计算层再抽出一些自己的东西和特异的优化。不知道 Arrow Datafusion 那边是怎么搞得,或许可以看看。
那么,在 Arrow 系统中,我们回到之前的文章:
- “name” -> Function,在一个 scope 内,一个 name 可能可以映射到一个 Function 上
- 多个接受不同类型的实现:一个 Function 下面会有多个 Kernel
这里有个显而易见的点是,查询或者计算层也需要某种程度上的对输入来匹配对应的输入,然后拿到对应的输出类型。这里就是说,Kernel
可能会有对应的 Signature
,包含了 <输入类型(可能是变参), 输出>
等。
好,那我们再回顾上一节中的结构:
Function: 由一个或者数个 Kernel 组成,每个 Kernel 有操作和对应的输入/输出类型,Function 按照 Function name 注册在 FunctionRegistry 中. 在类型匹配的时候,Function 支持「挑选最佳的匹配」,并且,Function 支持 implicit cast,将一些类型完成对应的转型.
这几个结构是不是其实清晰了很多。在 Dispatch
的时候,会有一些类型分发的逻辑。Arrow Compute 虽然支持自己转型(尝试对所有输入进行 type promote),但只能说这块做的很简陋
函数的类型
无论是 PostgreSQL ,还是 Velox,对于函数都有一定的区分,Velox 函数会有自己的元信息,在执行的时候通过框架层统一绑定了 UDF
的处理。它分为:
- SimpleFunction: 我们之前花比较大篇幅介绍的内容
- VectorFunction: 一批输入 -> 一批输出,可以访问自身的 meta,然后也可以用来实现 lambda function
- Aggregate Function: Agg 对应的处理
这里各种功能还是比较复杂的,pg 也区分了 Agg 呀,window 呀之类的处理,感觉这里是一个这样的逻辑:尝试用几种抽象来描述函数,这些有 Vector
, Scalar
, Agg
用的函数之类的内容。相信这块其实很好
此外,需要注意,函数是一种 Row
-> Row
的映射。Arrow 里面区分了 Function 的类型,但比较古怪的一点是,它把 Sort 也搞成了一种 Vector Function,在我看来还是蛮怪的
UDF / Table Function / … ?
Spark 之类的地方都会有这些东西的定义,我们只是贴出来
https://cloud.google.com/bigquery/docs/table-functions?hl=zh-cn
Special Function?
对于 CaseWhen, if/else 之类的函数,相当于处理逻辑是「类型没那么大关系」(有联系)且内置的,各家实现是不一样的:
- PostgreSQL: 在 exec 的时候直接处理, 这个地方没有走 fmgr:https://github.com/postgres/postgres/blob/8fea1bd5411b793697a4c9087c403887e050c4ac/src/backend/executor/execExpr.c#L1723
- Velox: velox 定义了 SpecialFunction 来解决这些问题, switch, cast 之类的都被分发到
SpecialFunction
而不是通用框架下,来解决这一问题 - Arrow: Arrow 目前 switch 之类的实现比较简陋,靠手工编码了一些「泛型」的匹配规则(即任意输入 -> 输出的匹配,我们之后可以看到代码。)。此外,有一些函数被注册为 “MetaFunction”,比如 “cast”,这算是一种比较 naive 的 special function。
其实不考虑 PostgreSQL 的设计,这里有一点问题是,如果一些执行流程(比如列式 Selector)不是 day 1 就开始考虑的话,这里的后头的实现就会给前面的实现填坑
Kernel 的逻辑
函数的计算要走到 Kernel 里面,这里也是具体实现函数逻辑的地方。我们会和 Velox Expression 计算一样介绍。Arrow 这块感觉作者其实 C++ 功底要好一些,但是完成度比 Velox 低。感觉还是这块本身没有做的那么好加上用户比较少导致的。我想这种原因一般都是正反馈的吧,(在热门领域中)功能完成度高,那么用户就会多,反过来 push 功能更多的完成。
不过话又说回来,感觉这块代码也是大家比较频繁会做的,虽然有 Velox 之类的插件化计算引擎,但我个人觉得至少在今年(2024)年,这块只是对于 developer 来说是一个 “趋势”,很多关键逻辑还是要懂哥自己去手写或者改的,不过这块感觉也都是 open problem 了,感觉相对于未知的秘密,很多都是工程实现的问题(虽然我觉得 Velox 有些代码是真 jb 恶心,不过感觉人家该做的事情蛮多也都做了)
Kernel 的选择和 Binding
在执行的时候,这里它实现还是比较花哨的,不仅需要考虑我们之前提到的内容(即各种函数的匹配和对应的实现),还有不同 simd-level 下的实现,显得这里面花头比较多
签名: KernelSignature
KernelSignature 包含 , OutputType>
,注意这里也包含 va-args 或者无参数的处理。这里实际上感觉也是个类型系统和分类的问题了,感觉在程序或者 pl 业界应该有比较成熟的解决方案,我虽然不是很懂,但还是死记硬背一下,之后遇到好的再想想这里抽象能不能改更好。
- Inputs:
- va_arg: 最后一个 input match 输入列表里面剩下的 input,允许参数数量和 input list 不一样,否则必须完全相同才 match
- Input: 可以是 any (match 任意类型), exact ( 完全一致的类型 ), typematcher ( 用手工编码的类型匹配 )
- TypeMatcher 允许手动匹配 Match 单个类型
- Output:
- 允许写死(FIXED) 或者根据输入计算 (COMPUTED)
Dispatch 的逻辑
我们先看一个简单的 Dispatch, 这里其实设计上我们之前说过,和类型系统相关,而且某种程度上应该是查询编译的时候处理好的内容,这里可能只是一层比较简单的逻辑:你会发现这里在匹配的时候,尝试抽取了公共的类型(尝试类型提升),在完成后,再去 DispatchExact。当然不同 Kernel 可能会有自己的 Dispatch 逻辑
1 | struct CompareFunction : ScalarFunction { |
这里大概逻辑就是抽取公共类型然后做类型提升,然后预期执行的时候做 Cast
WIP: Dispatch && Decimal?
时间戳和 Decimal 都会有相同的处理逻辑,Decimal 的问题是多个 Decimal32,Decimal64 不一定是同一个类型,可能要考虑这种不同的 Decimal 的处理。感觉这块有上层编译处理会方便很多,看最近一些讨论,现在 Decimal 计算还是个坑
Binding
我们在之前介绍过 Binding 的逻辑,可以看到:https://blog.mwish.me/2023/07/06/arrow-expression/#Bind
这里这段日子虽然进了 patch,但大概的逻辑还是一样的,即抽取公共表达式,然后再在 function 中尝试 Binding。这里需要注意一个地方,这里某个输入的来源可能是别的表达式(这很好假设),这里最终类型靠 bind 完 input 之后的 OutputType
去 Resolve 得到。
另一点是,Cast
这里会在 Bind
的时候,插入到表达式树中,这点差不多逻辑如下:
- 尝试对输入 DispatchExact
- 尝试抽取公共类型,去 DispatchBest ( 注意我感觉这里面有的是外层 Bind 做的,有的是内层 DispatchBest 做的)
- 最后输入如果和这个不一样,尝试和输入之间去插入一层 Cast
为了给 Call 插入内容,所以这里会插入一层 KernelState
和 Kernel
1 | struct Call { |
执行和 Context
在 Bind 之后,Scalar 表达式可以交给 ExecuteScalarExpression
来执行。
Kernel
的执行由 KernelExecutor
和 FunctionExecutor
来处理. Kernel
本身是无状态的,外部( 执行上下文保留 KernelState
)保留这个状态,我们往前翻几行就可以在 Call
的成员中找到这个 KernelState
。这部分还是通用的表达式信息,即别的类型表达式也可以带上这样的信息,每个 Kernel 可以负责:生成 KernelState,初始化,执行,finalize
FunctionExecutor
在上层去驱动 KernelExecutor
的执行,因为 Function 本身类型的不同(我们之前说的 Agg 之类的),所以 Kernel Executor 的类型也会不一样,但是也有一些逻辑,比如申请内存、Null Handling 是可以 unify 的。这里我们可以回顾 Velox 的表达式计算,它提供了 UDF 包装,然后执行框架抽取了 Null 处理之类的公共逻辑,来加速执行。Arrow 这块做的没有那么开放,它在 KernelExecutorImpl
和 Scalar
的 Kernel 执行中抽取了一些公共逻辑,但是完成度远远没有 Velox 那么高,大概只有 Null 和 Buffer 分配的一些逻辑在 Scalar 处理中(想必也是因为这部分逻辑不支持 Executor)
这里框架会尝试:
- 判断框架信息,尝试准备输出,输出包含 data 和 Null
- 对于 data,这里如果数据类型是 Fixed-Sized 的(StringView, 普通类型等)可以预先申请一部分空间
- 对于 Null,可能可以申请空间,或者允许内层自己判断
- 执行的时候,对每个输入 batch,初始化完上述信息后,调用内层的执行函数(这里就可以参考之前的逻辑了: https://blog.mwish.me/2023/05/27/Arrow-Compute/ )
MetaFunction 的执行
这里是个比较怪异的地方,MetaFunction 会自己分发执行给别的 function (用 CallFunction),更古怪的是,cast 本身有一些 Kernel,但是没有 Register 给外面,而是分发给 MetaFunction 来做执行了。反正我觉得 MetaFunction
Wrap up
- 在
FunctionRegistry
中,lookup by name & scope,然后检查GetFunction
函数名符合预期 - 对捞出来的函数判断类型(函数类型,即 Vector, Scalar, …),创建对应的 Executor
- 根据类型,尝试拿到对应的 kernel,进行 binding,然后尝试做一些简陋的优化
- context 的初始化。
- 对于输入的类型,抽出 Batch,然后尝试在框架层处理 Null、申请 Batch 内存
- 执行和输出