Arrow Acero Framework

Acero 是一个 「Streaming」 的处理引擎。这个 Streaming 有点类似 Monet/X100 意义上的:

  1. 最早 Monet 他们弄的是整个全部物化,每个按照自己的大 Batch 做执行
  2. 后来发现这样物化的开销过高,所以弄成了 Batch Exec

Acero 在 Arrow 中也是这样的存在,它是一个 Push-Based Executor,目前还不支持 Pipeline Executor 之类的形式。 也不支持 Sort Merge Join 和 Sort Agg。基本上支持的算子都在下面了。这也导致了一个现象:

  • SortBy, TopK 算子在 Acero 里面是结合 Sink 算子来实现的

img

Acero 相当于一个串联 Dataset (读/写),Function 的工具,产生需要的数据或者 Table。它并没有什么优化器,类似 Velox,对接的是外层的 Substrait[1] , dplyr [2] 接口。

我们会从 Declaration,Plan ,Node 层开始介绍 Acero 的结构。

Overview

文档的这一节介绍了 Acero 的结构( https://arrow.apache.org/docs/cpp/streaming_execution.html#architecture-overview

暂时无法在飞书文档外展示此内容

  1. ExecPlan 可以从 Declaration 中创建,作为一些语法糖. ExecPlan 不仅是一个
  2. ExecNode 是一个 Acero 中的基本控制单元,并且可以加入 ExecNodeOptions。各种类型的 ExecNode 构造函数不对外公开,但是可以走 ExecFactoryRegistry 创建。
  3. 数据通过 ExecBatch 来推送,这个东西也是个泛用类型的玩意

Acero 大部分实现是做在 Compute 上的,Acero 主要做的是串起 Dataset 和 Compute,这里还有张不错的图:

img

这里创建 ExecNode 的时候,类似 CallFunction, 也可以通过名称来创建,e.g. https://arrow.apache.org/docs/cpp/streaming_execution.html#constructing-execnode-using-options

Plan 是一个 ExecNode 的组合。它相当于整个执行的 Plan,而不是类似 PhysicalPlanNode 这种 ExecNode 对应的单个节点的结构。它也有一些执行的上下文 (ExecContextQueryOptions).

ExecBatch 是对应的传递的单元,它代码比较简单。这里它代码引入了 SelectionVector,但是实现并没有用上 selection vector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/// The values representing positional arguments to be passed to a kernel's
/// exec function for processing.
std::vector<Datum> values;

/// SV, 实际上是没有用上的
std::shared_ptr<SelectionVector> selection_vector;

/// DataBatch 内的 Guarantee. 这里的抽象比较有意思, 因为它可能倾向于
/// 里面来自同一个 RowGroup 或者啥的. 所以对上方吐出的数据会有一个 gurantee.
Expression guarantee = literal(true);

/// values 里面每个的长度(可能会有 constants, 这里它抽象和 Velox 的)
int64_t length = 0;

/// 这里允许并行读取多个文件然后 unordered 吐出去,这个 index 表示 Order 相关的部分。
int64_t index = kUnsequencedIndex;

img

上面是 ExecBatch 的官图。Acero 里的 Node 工厂如代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/// \brief An extensible registry for factories of ExecNodes
class ARROW_ACERO_EXPORT ExecFactoryRegistry {
public:
using Factory = std::function<Result<ExecNode*>(ExecPlan*, std::vector<ExecNode*>,
const ExecNodeOptions&)>;

virtual ~ExecFactoryRegistry() = default;

/// \brief Get the named factory from this registry
///
/// will raise if factory_name is not found
virtual Result<Factory> GetFactory(const std::string& factory_name) = 0;

/// \brief Add a factory to this registry with the provided name
///
/// will raise if factory_name is already in the registry
virtual Status AddFactory(std::string factory_name, Factory factory) = 0;
};

/// The default registry, which includes built-in factories.
ARROW_ACERO_EXPORT
ExecFactoryRegistry* default_exec_factory_registry();

/// \brief Construct an ExecNode using the named factory
inline Result<ExecNode*> MakeExecNode(
const std::string& factory_name, ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options,
ExecFactoryRegistry* registry = default_exec_factory_registry()) {
ARROW_ASSIGN_OR_RAISE(auto factory, registry->GetFactory(factory_name));
return factory(plan, std::move(inputs), options);
}

这里需要注意,Node 的关系基本上是 ExecNode 自己维护的,Plan 会去驱动整个执行。MakeExecNode 的时候,这里会 emplace_back 对应的节点。然后在 Build 的阶段拓扑排序。

节点有不同的类型,这里我们简单的抽象:

  • TableSource / Scan: 类似 “scan” 之类的接口,提供数据源。这里一部分代码实现在 Dataset 那套接口下面
  • Project: 很正常的 Project,在有的地方 Project 并不会是一个 Node,而是作为属性丢给各个算子自己,而不是搞出一个 Project。不过我感觉这块开销也不是很大
  • Filter: 给出对应的过滤逻辑,走 Compute::Filter
  • Sink: 计算的结果,OrderBy TopK 之类的算子目前是在 Sink 上面实现的,这有点奇怪。

在 Node 节点串起来之后,我们需要 Care 一下 Pipeline 的控制流. 在 Acero 内部有两个 Scheduler:

  1. AsyncScheduler
  2. TaskScheduler

AsyncScheduler 有点类似 Pipeline 的根结点,它会把 “scan” 之类最底端的 Node 推进去,然后 ScanNode 可能会调用通知下一层节点,来通信下一层信息。TaskScheduler 是专门给 Join 这种准备的,它会类似 ForkJoin,在线程组上执行多个任务。(不过 TaskScheduler 代码我没完全看懂)

Framework

我们在这里介绍 ExecPlanExecNode 两层,尽可能介绍一下

Future

首先我们需要介绍一下 arrow 的 Future,它实现了一套简单的 Future,实际上这是一个支持添加 Callback 的 shared_future,Arrow 用这个共享的 Future 来支持了 push-based 的语核。这里可以看 FutureFuture<Result<T>>, 它对 Status 等类型做了很好的支持。

这套东西实现在:https://arrow.apache.org/docs/cpp/api/async.html 。这套代码是比较正常的 Future 代码,关注接口就行了。实现等我看一圈 C++ Templates 再来啃吧 XD

在这里,Future 本身作为 shared_future, 所以可以这样

1
2
3
4
Result<Table> blockingOp() {
auto future = plan->GetFuture()
future.Wait(); // <-- 转换成阻塞操作,不影响内部的执行
}

Future 可以添加 CallbackThen.

  1. AddCallback 加入 Callback, Callback 是处理完本 Future 之后执行的一个或者多个回调函数。这里如果对一个 Finished Future 调用 Callback,那么可能会直接调用这个函数(傻了吧,就地 Callback)。为了解决这个问题,这里提供了 TryAddCallback 接口,如果执行完了,这里会不执行这个
    1. Callback 可以绑定对应的 options ,其中可以带上对应的 Executor。这里实现的时候,就允许你这个地方执行对应的逻辑。也有一些逻辑决定是否调度对应的 Future,类似 Folly 中的 via
  2. Then 类似 Callback, 但是本身产生一个 future。它现在会当成 Callback 实现,不会在 Callback 执行完之后被调度,而是当成一个普通的 Callback。

我们可以简单看几个 Future 的使用例子(而不是看代码):

Case 1: All

这里关键点是用 Callback 实现,然后处理错误,返回一个新的 Future。然后所有 Future 结束在新的 Future 里面 MarkFinished

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <typename T>
Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
struct State {
explicit State(std::vector<Future<T>> f)
: futures(std::move(f)), n_remaining(futures.size()) {}

std::vector<Future<T>> futures;
std::atomic<size_t> n_remaining;
};

if (futures.size() == 0) {
return {std::vector<Result<T>>{}};
}

auto state = std::make_shared<State>(std::move(futures));

auto out = Future<std::vector<Result<T>>>::Make();
for (const Future<T>& future : state->futures) {
future.AddCallback([state, out](const Result<T>&) mutable {
if (state->n_remaining.fetch_sub(1) != 1) return;

std::vector<Result<T>> results(state->futures.size());
for (size_t i = 0; i < results.size(); ++i) {
results[i] = state->futures[i].result();
}
out.MarkFinished(std::move(results));
});
}
return out;
}

Case2: CountRows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
using FragmentGenerator = AsyncGenerator<std::shared_ptr<Fragment>>;

Result<FragmentGenerator> AsyncScanner::GetFragments() const {
// TODO(ARROW-8163): Async fragment scanning will return AsyncGenerator<Fragment>
// here. Current iterator based versions are all fast & sync so we will just ToVector
// it
ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter));
ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector());
return MakeVectorGenerator(std::move(fragments_vec));
}

Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());

compute::ExecContext exec_context(scan_options_->pool, executor);

ARROW_ASSIGN_OR_RAISE(auto plan, acero::ExecPlan::Make(exec_context));
// Drop projection since we only need to count rows
const auto options = std::make_shared<ScanOptions>(*scan_options_);
ARROW_ASSIGN_OR_RAISE(auto empty_projection,
ProjectionDescr::FromNames(std::vector<std::string>(),
*scan_options_->dataset_schema));
SetProjection(options.get(), empty_projection);

auto total = std::make_shared<std::atomic<int64_t>>(0);

fragment_gen = MakeMappedGenerator(
std::move(fragment_gen),
[options, total](const std::shared_ptr<Fragment>& fragment) {
return fragment->CountRows(options->filter, options)
.Then([options, total, fragment](std::optional<int64_t> fast_count) mutable
-> std::shared_ptr<Fragment> {
if (fast_count) {
// fast path: got row count directly; skip scanning this fragment
(*total) += *fast_count;
return std::make_shared<InMemoryFragment>(options->dataset_schema,
RecordBatchVector{});
}

// slow path: actually filter this fragment's batches
return std::move(fragment);
});
});

acero::Declaration count_plan = acero::Declaration::Sequence(
{{"scan",
ScanNodeOptions{std::make_shared<FragmentDataset>(scan_options_->dataset_schema,
std::move(fragment_gen)),
options}},
{"project", acero::ProjectNodeOptions{{options->filter}, {"mask"}}},
{"aggregate", acero::AggregateNodeOptions{{compute::Aggregate{
"sum", nullptr, "mask", "selected_count"}}}}});

return acero::DeclarationToBatchesAsync(std::move(count_plan), exec_context)
.Then([total](const RecordBatchVector& batches) -> Result<int64_t> {
DCHECK_EQ(1, batches.size());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> count_scalar,
batches[0]->column(0)->GetScalar(0));
return total->load() +
static_cast<int64_t>(
::arrow::internal::checked_pointer_cast<UInt64Scalar>(count_scalar)
->value);
});
}

这个 Then 用的还挺有趣

Plan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class ARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
public:
// This allows operators to rely on signed 16-bit indices
static const uint32_t kMaxBatchSize = 1 << 15;
using NodeVector = std::vector<ExecNode*>;

virtual ~ExecPlan() = default;

QueryContext* query_context();

/// \brief retrieve the nodes in the plan
const NodeVector& nodes() const;

ExecNode* AddNode(std::unique_ptr<ExecNode> node);

template <typename Node, typename... Args>
Node* EmplaceNode(Args&&... args) {
std::unique_ptr<Node> node{new Node{std::forward<Args>(args)...}};
auto out = node.get();
AddNode(std::move(node));
return out;
}

Status Validate();

/// \brief Start producing on all nodes
///
/// Nodes are started in reverse topological order, such that any node
/// is started before all of its inputs.
void StartProducing();

/// \brief Stop producing on all nodes
///
/// Triggers all sources to stop producing new data. In order to cleanly stop the plan
/// will continue to run any tasks that are already in progress. The caller should
/// still wait for `finished` to complete before destroying the plan.
void StopProducing();

/// \brief A future which will be marked finished when all tasks have finished.
Future<> finished();

/// \brief Return whether the plan has non-empty metadata
bool HasMetadata() const;

/// \brief Return the plan's attached metadata
std::shared_ptr<const KeyValueMetadata> metadata() const;

std::string ToString() const;
};

上面这段代码生命周期管理还是非常好理解的:

  1. StartProducing: 开始产生数据
  2. StopProducing: 停止产生 source 数据

Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class ARROW_ACERO_EXPORT ExecNode {
public:
using NodeVector = std::vector<ExecNode*>;

virtual ~ExecNode() = default;

virtual const char* kind_name() const = 0;

// The number of inputs expected by this node
int num_inputs() const { return static_cast<int>(inputs_.size()); }

/// This node's predecessors in the exec plan
const NodeVector& inputs() const { return inputs_; }

/// True if the plan has no output schema (is a sink)
bool is_sink() const { return !output_schema_; }

/// \brief Labels identifying the function of each input.
const std::vector<std::string>& input_labels() const { return input_labels_; }

/// This node's successor in the exec plan
const ExecNode* output() const { return output_; }

/// The datatypes for batches produced by this node
const std::shared_ptr<Schema>& output_schema() const { return output_schema_; }

/// This node's exec plan
ExecPlan* plan() { return plan_; }

/// \brief An optional label, for display and debugging
///
/// There is no guarantee that this value is non-empty or unique.
const std::string& label() const { return label_; }
void SetLabel(std::string label) { label_ = std::move(label); }

virtual Status Validate() const;

/// \brief the ordering of the output batches
virtual const Ordering& ordering() const;

/// Transfer input batch to ExecNode
///
/// A node will typically perform some kind of operation on the batch
/// and then call InputReceived on its outputs with the result.
///
/// Other nodes may need to accumulate some number of inputs before any
/// output can be produced. These nodes will add the batch to some kind
/// of in-memory accumulation queue and return.
virtual Status InputReceived(ExecNode* input, ExecBatch batch) = 0;

/// Mark the inputs finished after the given number of batches.
///
/// This may be called before all inputs are received. This simply fixes
/// the total number of incoming batches for an input, so that the ExecNode
/// knows when it has received all input, regardless of order.
virtual Status InputFinished(ExecNode* input, int total_batches) = 0;

/// \brief Perform any needed initialization
///
/// This hook performs any actions in between creation of ExecPlan and the call to
/// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
/// that executes this method is undefined, but the calls are made synchronously.
///
/// At this point a node can rely on all inputs & outputs (and the input schemas)
/// being well defined.
virtual Status Init();

/// \brief Start producing
///
/// This must only be called once.
///
/// This is typically called automatically by ExecPlan::StartProducing().
virtual Status StartProducing() = 0;

/// \brief Pause producing temporarily
///
/// \param output Pointer to the output that is full
/// \param counter Counter used to sequence calls to pause/resume
///
/// This call is a hint that an output node is currently not willing
/// to receive data.
///
/// This may be called any number of times.
/// However, the node is still free to produce data (which may be difficult
/// to prevent anyway if data is produced using multiple threads).
virtual void PauseProducing(ExecNode* output, int32_t counter) = 0;

/// \brief Resume producing after a temporary pause
///
/// \param output Pointer to the output that is now free
/// \param counter Counter used to sequence calls to pause/resume
///
/// This call is a hint that an output node is willing to receive data again.
///
/// This may be called any number of times.
virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0;

/// \brief Stop producing new data
///
/// If this node is a source then the source should stop generating data
/// as quickly as possible. If this node is not a source then there is typically
/// nothing that needs to be done although a node may choose to start ignoring incoming
/// data.
///
/// This method will be called when an error occurs in the plan
/// This method may also be called by the user if they wish to end a plan early
/// Finally, this method may be called if a node determines it no longer needs any more
/// input (for example, a limit node).
///
/// This method may be called multiple times.
///
/// This is not a pause. There will be no way to start the source again after this has
/// been called.
virtual Status StopProducing();

std::string ToString(int indent = 0) const;

protected:
ExecNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels,
std::shared_ptr<Schema> output_schema);

virtual Status StopProducingImpl() = 0;

/// Provide extra info to include in the string representation.
virtual std::string ToStringExtra(int indent = 0) const;

std::atomic<bool> stopped_;
ExecPlan* plan_;
std::string label_;

NodeVector inputs_;
std::vector<std::string> input_labels_;

std::shared_ptr<Schema> output_schema_;
ExecNode* output_ = NULLPTR;
};

这块代码还是比较清晰的,而且考虑了 Pause Resume 等操作的并行性(这里给操作带上了一个外部的 Counter,由外部维护操作的序号,来做定序,没有幂等的需求)。哎我真觉得这个代码非常清晰了。

暂时无法在飞书文档外展示此内容

Scheduler

TaskScheduler 的代码我没看懂,因此只看 AsyncScheduler 就行。

这块的代码还是比较奇葩的,AsyncTaskScheduler 会有几个相关的类型:

  1. AsyncTaskScheduler
  2. AsyncTaskSchedulerImpl: 具体实现
  3. ThrottledAsyncTaskScheduler: 限流的 Task Scheduler
  4. AsyncTaskGroup: 这个是个比较奇怪的东西,比方说 Scan 的时候,这里会要原本的文件活着,所以成为一个小的调度组。AsyncTaskGroupImpl 是它的一个实现。

这里简单看一下这里的创建方式就行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/// Construct a scheduler
///
/// \param initial_task The initial task which is responsible for adding
/// the first subtasks to the scheduler.
/// \param abort_callback A callback that will be triggered immediately after a task
/// fails while other tasks may still be running. Nothing needs to be done here,
/// when a task fails the scheduler will stop accepting new tasks and eventually
/// return the error. However, this callback can be used to more quickly end
/// long running tasks that have already been submitted. Defaults to doing
/// nothing.
/// \param stop_token An optional stop token that will allow cancellation of the
/// scheduler. This will be checked before each task is submitted and, in the
/// event of a cancellation, the scheduler will enter an aborted state. This is
/// a graceful cancellation and submitted tasks will still complete.
/// \return A future that will be completed when the initial task and all subtasks have
/// finished.
static Future<> Make(
FnOnce<Status(AsyncTaskScheduler*)> initial_task,
FnOnce<void(const Status&)> abort_callback = [](const Status&) {},
StopToken stop_token = StopToken::Unstoppable());

注意 Scan 的时候怎么使用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Future<> AddScanTasks(const std::shared_ptr<FragmentScanner>& fragment_scanner) {
scan_state->fragment_scanner = fragment_scanner;
ScanState* state_view = scan_state.get();
Future<> list_and_scan_done = Future<>::Make();
// Finish callback keeps the scan state alive until all scan tasks done
struct StateHolder {
Status operator()() {
list_and_scan_done.MarkFinished();
return Status::OK();
}
Future<> list_and_scan_done;
std::unique_ptr<ScanState> scan_state;
};

std::unique_ptr<util::AsyncTaskGroup> scan_tasks = util::AsyncTaskGroup::Make(
node->batches_throttle_.get(),
StateHolder{list_and_scan_done, std::move(scan_state)});
for (int i = 0; i < fragment_scanner->NumBatches(); i++) {
node->num_batches_.fetch_add(1);
scan_tasks->AddTask(std::make_unique<ScanBatchTask>(node, state_view, i));
}
// The "list fragments" task doesn't actually end until the fragments are
// all scanned. This allows us to enforce fragment readahead.
return list_and_scan_done;
}

这里 scan_stateStateHolder 中维护。并且做了 dtor callback。

Node Implementions

Scan

ScanNode 采取 Push 的策略。并且可能有一些 Backpressure. 这部分代码可以见于 arrow/dataset/scanner.cc

这里大概逻辑是把 Parquet FileFragment 弄成 Scanner,然后弄成 AsyncRecordBatchGenerator,吐出 RecordBatch

Unordered Generator

这里会在文件产生的 RecordBatch 上标注一个序,但是不手动定序。

Ordered Generator

构建在 Unordered Generator 上,排序的 BatchGen

References

  1. https://substrait.io/
  2. https://dplyr.tidyverse.org/
  3. https://arrow.apache.org/docs/cpp/streaming_execution.html
  4. https://arrow.apache.org/docs/dev/cpp/streaming_execution.html