Arrow Compute Vector Function: Framework

在 Arrow Compute 中,Vector 是一种很奇怪的东西,我们可以回顾上一节 ( https://blog.mwish.me/2024/06/02/Arrow-Compute-Scalar-Function-Framework/

Arrow 里面区分了 Function 的类型,但比较古怪的一点是,它把 Sort 也搞成了一种 Vector Function,在我看来还是蛮怪的。

这点在 Velox 里面还是有相对严格一些的区分的:https://facebookincubator.github.io/velox/develop/aggregate-functions.html

Arrow 这里有几类 Function:

  • ScalarAggregate: 给多组输入输出单个 Output (Scalar Summary), 类似 COUNT(*) 之类的
  • HashAggregate: 给多组输入输出多个 Output (Scalar Summary), 类似 COUNT(*) GROUP BY b之类的
  • Vector: 大批数据 in 大批数据 out, 输出取决于整组输入

(我感觉,某种意义上,ScalarAgg 是特殊的 HashAgg, HashAgg 是特殊的 Vector,用特定的洞来提供特定的优化)

这里可以在 register_internal.h 搜到相关 Kernel 注册的代码(或者你自己手写 LLVM 或者自己绑定在框架了 Orz)

翻下面代码的时候突然发现我其实没理解之前的 Nullary,这里代表的是有 row-count 但是没有结构类似的概念,比如 random, count_all, 即count(*) 的实现,这里只会接受输入的信息长度,输入也不会有 Null ( 本身 Row 不会有 Null 的概念,但有的系统中可能会有它的 Selector 之类的选择向量之类的概念)。

ScalarAggregate

ScalarAggregator 类型如下所示:

1
2
3
4
5
struct ScalarAggregator : public KernelState {
virtual Status Consume(KernelContext* ctx, const ExecSpan& batch) = 0;
virtual Status MergeFrom(KernelContext* ctx, KernelState&& src) = 0;
virtual Status Finalize(KernelContext* ctx, Datum* out) = 0;
};

同时,我们可以稍微关注一下这里实际的 ScalarAggregateKernel 的签名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/// \brief Kernel data structure for implementations of
/// ScalarAggregateFunction. The four necessary components of an aggregation
/// kernel are the init, consume, merge, and finalize functions.
///
/// * init: creates a new KernelState for a kernel.
/// * consume: processes an ExecSpan and updates the KernelState found in the
/// KernelContext.
/// * merge: combines one KernelState with another.
/// * finalize: produces the end result of the aggregation using the
/// KernelState in the KernelContext.
struct ARROW_EXPORT ScalarAggregateKernel : public Kernel {
ScalarAggregateConsume consume;
ScalarAggregateMerge merge;
ScalarAggregateFinalize finalize;
/// \brief Whether this kernel requires ordering
/// Some aggregations, such as, "first", requires some kind of input order. The
/// order can be implicit, e.g., the order of the input data, or explicit, e.g.
/// the ordering specified with a window aggregation.
/// The caller of the aggregate kernel is responsible for passing data in some
/// defined order to the kernel. The flag here is a way for the kernel to tell
/// the caller that data passed to the kernel must be defined in some order.
bool ordered = false;
};

(这里面的 ordered 处于挖了坑,但是只防御了 Pipeline 去处理 Order 的状态,不是一个很完善的 order 处理)

我们可以看出来,这里实际上会有大概这样的逻辑:

  1. 初始化的时候,可能会创建一个 ScalarAggregator,然后绑定到 KernelContext 上,作为实际执行运算的数据和逻辑
  2. 如果用户没有自定义 consume, merge, finalize 的逻辑,那么这个就会直接用 KernelContext 上的
    1. Consume 会消费上游传下来的 ExecSpan,攒一定的内部状态
    2. Merge 能够 merge 多个 KernelContext 上的同样的 Aggregator
    3. Finalize 产出最终的结果

我们关注一个比较简单的例子: CountDistinct,我们关注:

下面是注册的代码,这里实现委派给了 CountDistinctImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <typename Type, typename VisitorArgType = typename Type::c_type>
void AddCountDistinctKernel(InputType type, ScalarAggregateFunction* func) {
AddAggKernel(KernelSignature::Make({type}, int64()),
CountDistinctInit<Type, VisitorArgType>, func);
}

void AddCountDistinctKernels(ScalarAggregateFunction* func) {
// Boolean
AddCountDistinctKernel<BooleanType>(boolean(), func);
// Number
AddCountDistinctKernel<Int8Type>(int8(), func);
AddCountDistinctKernel<Int16Type>(int16(), func);
AddCountDistinctKernel<Int32Type>(int32(), func);
// ...
}

CountDistinctImpl 这类的实现,因为要生成对应的逻辑,下面的所有操作都是单线程的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
template <typename Type, typename VisitorArgType>
struct CountDistinctImpl : public ScalarAggregator {
using MemoTable = typename arrow::internal::HashTraits<Type>::MemoTableType;

explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options)
: options(std::move(options)), memo_table_(new MemoTable(memory_pool, 0)) {}

Status Consume(KernelContext*, const ExecSpan& batch) override {
if (batch[0].is_array()) {
// ...
} else {
const Scalar& input = *batch[0].scalar;
this->has_nulls = !input.is_valid;

if (input.is_valid) {
int32_t unused;
RETURN_NOT_OK(memo_table_->GetOrInsert(UnboxScalar<Type>::Unbox(input), &unused));
}
}

this->non_nulls = memo_table_->size();
return Status::OK();
}

Status MergeFrom(KernelContext*, KernelState&& src) override {
const auto& other_state = checked_cast<const CountDistinctImpl&>(src);
RETURN_NOT_OK(this->memo_table_->MergeTable(*(other_state.memo_table_)));
this->non_nulls = this->memo_table_->size();
this->has_nulls = this->has_nulls || other_state.has_nulls;
return Status::OK();
}

Status Finalize(KernelContext* ctx, Datum* out) override {
const auto& state = checked_cast<const CountDistinctImpl&>(*ctx->state());
const int64_t nulls = state.has_nulls ? 1 : 0;
switch (state.options.mode) {
case CountOptions::ONLY_VALID:
*out = Datum(state.non_nulls);
break;
case CountOptions::ALL:
*out = Datum(state.non_nulls + nulls);
break;
case CountOptions::ONLY_NULL:
*out = Datum(nulls);
break;
default:
DCHECK(false) << "unreachable";
}
return Status::OK();
}

const CountOptions options;
int64_t non_nulls = 0;
bool has_nulls = false;
std::unique_ptr<MemoTable> memo_table_;
};

下面应该关注 Executor 的逻辑,就是框架代码怎么被执行?能否并行?

这块有几个可以看的地方,先看 ScalarAggExecutor,这里的执行逻辑非常简单,是一个单线程不断更新同一个地方的逻辑,没有用到 Merge:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize(),
/*promote_if_all_scalars=*/false));

ExecSpan span;
while (span_iterator_.Next(&span)) {
// TODO: implement parallelism
if (span.length > 0) {
RETURN_NOT_OK(Consume(span));
}
}

Datum out;
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &out));
RETURN_NOT_OK(listener->OnResult(std::move(out)));
return Status::OK();
}

acero::ScalarAggregateNode 的实现显然更有意思。这里初始化出了对应的 Kernels: 一组 kernel:

1
2
3
4
5
6
7
class ScalarAggregateNode : public ExecNode, public TracedNode {
const std::vector<const ScalarAggregateKernel*> kernels_;

// Input type holders for each kernel, used for state initialization
std::vector<std::vector<TypeHolder>> kernel_intypes_;
std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
};

这里的逻辑非常好玩:

  1. 每个 Kernel 在每个线程会有一组 KernelState
  2. 每个线程进入的时候,只更新本线程的 KernelState
  3. 最后大家 Merge 到一起输出

这里行为比较像 Pipeline Execution 了(即上游定并发度,然后 Agg 到一起)。

Grouped Aggregations (“group by”)

Hash (Grouped) Aggregate Function 是在特殊的 Operator 才会使用的 Operator,相对于 Scalar Aggregate 这种「接受所有输入,然后输出一个值」的,Grouped 需要按照 “Key” 对 input 进行 partition,然后输出的时候同时输出 key 和 aggregate value。

值得注意的是,HashAggregateKernel 并不能被

我们可以看到,这里多了一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/// \brief Kernel data structure for implementations of
/// HashAggregateFunction. The four necessary components of an aggregation
/// kernel are the init, consume, merge, and finalize functions.
///
/// * init: creates a new KernelState for a kernel.
/// * resize: ensure that the KernelState can accommodate the specified number of groups.
/// * consume: processes an ExecSpan (which includes the argument as well
/// as an array of group identifiers) and updates the KernelState found in the
/// KernelContext.
/// * merge: combines one KernelState with another.
/// * finalize: produces the end result of the aggregation using the
/// KernelState in the KernelContext.
struct ARROW_EXPORT HashAggregateKernel : public Kernel {
HashAggregateResize resize;
HashAggregateConsume consume;
HashAggregateMerge merge;
HashAggregateFinalize finalize;
/// @brief whether the summarizer requires ordering
/// This is similar to ScalarAggregateKernel. See ScalarAggregateKernel
/// for detailed doc of this variable.
bool ordered = false;
};

我们也可以看到 GroupedAggregator 的形态:

1
2
3
4
5
6
7
8
9
10
11
/// C++ abstract base class for the HashAggregateKernel interface.
/// Implementations should be default constructible and perform initialization in
/// Init().
struct GroupedAggregator : KernelState {
virtual Status Init(ExecContext*, const KernelInitArgs& args) = 0;
virtual Status Resize(int64_t new_num_groups) = 0;
virtual Status Consume(const ExecSpan& batch) = 0;
virtual Status Merge(GroupedAggregator&& other, const ArrayData& group_id_mapping) = 0;
virtual Result<Datum> Finalize() = 0;
virtual std::shared_ptr<DataType> out_type() const = 0;
};

可以看到,这里其实是外部来 explicit 的管理 groups 的数量的。

一般来说,对输入进行汇总有两种形式:

  • 排序
  • 哈希

这两种在之前的博客中都有足够入门的介绍。Arrow 的 GroupedAggregator 在有的地方用了 Hash 这个名字,但是其实和 Hash 本身是无关的,GroupedAggregator 的抽象本身是:

  1. 基础抽象和 ScalarAggregate 一致
  2. 维护了 groups,这个 Groups 通过 Resize 来确定大小,每次 Resize 即增加 Group 的数量
  3. 输入的 ExecSpan 会带上最后一列 Uint32Array 作为 groupId

我们可以看看上层是怎么调用的,具体例子可以看 acero::GroupByNode:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Status GroupByNode::Consume(ExecSpan batch) {
/**
* 忽略初始化的逻辑
*/

// Create a batch with key columns
std::vector<ExecValue> keys(key_field_ids_.size());
for (size_t i = 0; i < key_field_ids_.size(); ++i) {
keys[i] = batch[key_field_ids_[i]];
}
ExecSpan key_batch(std::move(keys), batch.length);

// Create a batch with group ids
ARROW_ASSIGN_OR_RAISE(Datum id_batch, state->grouper->Consume(key_batch));

// Execute aggregate kernels
for (size_t i = 0; i < agg_kernels_.size(); ++i) {
auto ctx = plan_->query_context()->exec_context();
KernelContext kernel_ctx{ctx};
kernel_ctx.SetState(state->agg_states[i].get());

std::vector<ExecValue> column_values;
for (const int field : agg_src_fieldsets_[i]) {
column_values.push_back(batch[field]);
}
column_values.emplace_back(*id_batch.array());
ExecSpan agg_batch(std::move(column_values), batch.length);
RETURN_NOT_OK(agg_kernels_[i]->resize(&kernel_ctx, state->grouper->num_groups()));
RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch));
}

return Status::OK();
}

这里可以看到,通过 Grouper,这里成功填充了一组 id_batch。Grouper 是怎么实现的呢?这里参考代码 https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/row/grouper.cc . Grouper 内实际上有一个 SwissTable,对一组 Key 能够 Batch Probe 到对应的位置,然后插入,并且对每一个新输入 assign 一个独立、递增的 GroupId。通过这种方式,这里更新到 Group Ids,然后做对应的处理。这里对应逻辑也可以见:https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/doc/key_map.md

“Vector” Function

Vector 函数的定义我们之前也说过了,这里 Vector 因为是一堆输入生成一堆新的输入: https://arrow.apache.org/docs/cpp/compute.html#array-wise-vector-functions . 这里有个相对复杂的地方,就是 ChunkedArray 的处理,这个函数看上去会相对复杂很多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/// \brief Kernel data structure for implementations of VectorFunction. In
/// contains an optional finalizer function, the null handling and memory
/// pre-allocation preferences (which have different defaults from
/// ScalarKernel), and some other execution-related options.
struct ARROW_EXPORT VectorKernel : public Kernel {
/// \brief See VectorKernel::finalize member for usage
using FinalizeFunc = std::function<Status(KernelContext*, std::vector<Datum>*)>;

/// \brief Function for executing a stateful VectorKernel against a
/// ChunkedArray input. Does not need to be defined for all VectorKernels
using ChunkedExec = Status (*)(KernelContext*, const ExecBatch&, Datum* out);

VectorKernel() = default;

VectorKernel(std::vector<InputType> in_types, OutputType out_type, ArrayKernelExec exec,
KernelInit init = NULLPTR, FinalizeFunc finalize = NULLPTR)
: Kernel(std::move(in_types), std::move(out_type), std::move(init)),
exec(exec),
finalize(std::move(finalize)) {}

VectorKernel(std::shared_ptr<KernelSignature> sig, ArrayKernelExec exec,
KernelInit init = NULLPTR, FinalizeFunc finalize = NULLPTR)
: Kernel(std::move(sig), std::move(init)),
exec(exec),
finalize(std::move(finalize)) {}

/// \brief Perform a single invocation of this kernel. Any required state is
/// managed through the KernelContext.
ArrayKernelExec exec;

/// \brief Execute the kernel on a ChunkedArray. Does not need to be defined
ChunkedExec exec_chunked = NULLPTR;

/// \brief For VectorKernel, convert intermediate results into finalized
/// results. Mutates input argument. Some kernels may accumulate state
/// (example: hashing-related functions) through processing chunked inputs, and
/// then need to attach some accumulated state to each of the outputs of
/// processing each chunk of data.
FinalizeFunc finalize;

/// Since vector kernels generally are implemented rather differently from
/// scalar/elementwise kernels (and they may not even yield arrays of the same
/// size), so we make the developer opt-in to any memory preallocation rather
/// than having to turn it off.
NullHandling::type null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
MemAllocation::type mem_allocation = MemAllocation::NO_PREALLOCATE;

/// \brief Writing execution results into larger contiguous allocations
/// requires that the kernel be able to write into sliced output ArrayData*,
/// including sliced output validity bitmaps. Some kernel implementations may
/// not be able to do this, so setting this to false disables this
/// functionality.
bool can_write_into_slices = true;


/// Some vector kernels can do chunkwise execution using ExecSpanIterator,
/// in some cases accumulating some state. Other kernels (like Take) need to
/// be passed whole arrays and don't work on ChunkedArray inputs
bool can_execute_chunkwise = true;

/// Some kernels (like unique and value_counts) yield non-chunked output from
/// chunked-array inputs. This option controls how the results are boxed when
/// returned from ExecVectorFunction
///
/// true -> ChunkedArray
/// false -> Array
bool output_chunked = true;
};

这块代码其实非常模糊,所以直接看 VectorExec 的代码,这套代码还挺有意思的:https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/exec.cc#L1024

  1. 这里有 execexec_chunked,同样还有 can_execute_chunkwiseoutput_chunked,这里面如果输入不含 ChunkedArray,那么只有 exec 会执行,但是实际上,这里会根据
    1. can_execute_chunkwise: 输入如果包含 chunk,能否独立的处理输入
    2. can_write_into_slices: 输出能否正常的输出到 array
    3. output_chunked: 输出能否 chunked
  2. 剩下的和 Scalar 都类似,比如 Null Handling,比如 PrepareOutput 的提前申请 ArrayData

这块还是看代码比较直接。再次举例 “Unique” 之类的实现,虽然 Vector 的算子很多,但是得找个地方,这里实现可以先参考 Action 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ActionBase {
public:
ActionBase(const std::shared_ptr<DataType>& type, MemoryPool* pool)
: type_(type), pool_(pool) {}

protected:
std::shared_ptr<DataType> type_;
MemoryPool* pool_;
};

// ----------------------------------------------------------------------
// Unique

class UniqueAction final : public ActionBase {
public:
using ActionBase::ActionBase;

static constexpr bool with_error_status = false;

UniqueAction(const std::shared_ptr<DataType>& type, const FunctionOptions* options,
MemoryPool* pool)
: ActionBase(type, pool) {}

Status Reset() { return Status::OK(); }
Status Reserve(const int64_t length) { return Status::OK(); }
template <class Index>
void ObserveNullFound(Index index) {}
template <class Index>
void ObserveNullNotFound(Index index) {}
template <class Index>
void ObserveFound(Index index) {}
template <class Index>
void ObserveNotFound(Index index) {}
bool ShouldEncodeNulls() { return true; }
Status Flush(ExecResult* out) { return Status::OK(); }
Status FlushFinal(ExecResult* out) { return Status::OK(); }
};

这里给 unique, value_counts 等实现了不同的 Action。同时甚至也处理了 dictionary。这块代码可以看到 vector_hash.cc 等地方.

vector_hash.cc 并不是计算 hash operator 的地方,它是用来处理 “unique”, “count_distinct” 之类的地方,这里上层的逻辑是:

  1. 需要有 Hash Table
  2. 每个 Hash Table Value 实现的逻辑需要处理在 ActionBase 里面,上层的 key 逻辑被抽象成为了 index

这里也抽象开了 Dictionary 和正常 Kernel 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// ----------------------------------------------------------------------
template <typename HashKernel>
Result<std::unique_ptr<KernelState>> HashInit(KernelContext* ctx,
const KernelInitArgs& args) {
auto result = std::make_unique<HashKernel>(args.inputs[0].GetSharedPtr(), args.options,
ctx->memory_pool());
RETURN_NOT_OK(result->Reset());
// R build with openSUSE155 requires an explicit unique_ptr construction
return std::unique_ptr<KernelState>(std::move(result));
}

template <typename Action>
KernelInit GetHashInit(Type::type type_id) {
// ARROW-8933: Generate only a single hash kernel per physical data
// representation
switch (type_id) {
case Type::NA:
return HashInit<NullHashKernel<Action>>;
case Type::BOOL:
return HashInit<RegularHashKernel<BooleanType, Action>>;
case Type::INT8:
case Type::UINT8:
return HashInit<RegularHashKernel<UInt8Type, Action>>;
case Type::INT16:
case Type::UINT16:
return HashInit<RegularHashKernel<UInt16Type, Action>>;
case Type::INT32:
case Type::UINT32:
case Type::FLOAT:
case Type::DATE32:
case Type::TIME32:
case Type::INTERVAL_MONTHS:
return HashInit<RegularHashKernel<UInt32Type, Action>>;
case Type::INT64:
case Type::UINT64:
case Type::DOUBLE:
case Type::DATE64:
case Type::TIME64:
case Type::TIMESTAMP:
case Type::DURATION:
case Type::INTERVAL_DAY_TIME:
return HashInit<RegularHashKernel<UInt64Type, Action>>;
case Type::BINARY:
case Type::STRING:
return HashInit<RegularHashKernel<BinaryType, Action, std::string_view>>;
case Type::LARGE_BINARY:
case Type::LARGE_STRING:
return HashInit<RegularHashKernel<LargeBinaryType, Action, std::string_view>>;
case Type::BINARY_VIEW:
case Type::STRING_VIEW:
return HashInit<RegularHashKernel<BinaryViewType, Action, std::string_view>>;
case Type::FIXED_SIZE_BINARY:
case Type::DECIMAL128:
case Type::DECIMAL256:
return HashInit<RegularHashKernel<FixedSizeBinaryType, Action, std::string_view>>;
case Type::INTERVAL_MONTH_DAY_NANO:
return HashInit<RegularHashKernel<MonthDayNanoIntervalType, Action>>;
default:
Unreachable("non hashable type");
}
}

using DictionaryEncodeState = OptionsWrapper<DictionaryEncodeOptions>;

template <typename Action>
Result<std::unique_ptr<KernelState>> DictionaryHashInit(KernelContext* ctx,
const KernelInitArgs& args) {
const auto& dict_type = checked_cast<const DictionaryType&>(*args.inputs[0].type);
ARROW_ASSIGN_OR_RAISE(auto indices_hasher,
GetHashInit<Action>(dict_type.index_type()->id())(ctx, args));
return std::make_unique<DictionaryHashKernel>(
checked_pointer_cast<HashKernel>(std::move(indices_hasher)),
dict_type.value_type());
}

可以看到,这里处理了字典和非字典项,并统一到 Index 来 Handling。

Kernel 比较重要的是处理的函数,忽略错误处理和 Dictionary 实现,可以看到这里内部实现的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <bool HasError = with_error_status>
enable_if_t<!HasError, Status> DoAppend(const ArraySpan& arr) {
return VisitArraySpanInline<Type>(
arr,
[this](Scalar v) {
auto on_found = [this](int32_t memo_index) {
action_.ObserveFound(memo_index);
};
auto on_not_found = [this](int32_t memo_index) {
action_.ObserveNotFound(memo_index);
};

int32_t unused_memo_index;
return memo_table_->GetOrInsert(v, std::move(on_found), std::move(on_not_found),
&unused_memo_index);
},
[this]() {
if (action_.ShouldEncodeNulls()) {
auto on_found = [this](int32_t memo_index) {
action_.ObserveNullFound(memo_index);
};
auto on_not_found = [this](int32_t memo_index) {
action_.ObserveNullNotFound(memo_index);
};
memo_table_->GetOrInsertNull(std::move(on_found), std::move(on_not_found));
} else {
action_.ObserveNullNotFound(-1);
}
return Status::OK();
});
}

就这样,这里通过 Action 来实现了不同类型的处理,并注册了所有的类型的实现的代码。