Parquet Part2: arrow Parquet and levels

arrow 代码库包含了 Parquet 的 C++ 官方实现,尽管 impala 之类的第三方实现处理的功能要多一些,但是 back to basics,看看 parquet 这个也不是一件坏事。

需要声明的是,如 Velox 所说,使用 Parquet 来访问 IO 本身也需要一些复杂的项目。这些部分可能包括:

  1. 谓词下推
  2. IO 访问
  3. Lazy Decoding
  4. 全局字典

具体可以参考 abadi 的论文,其实没什么难的,工程上基本上也没啥花头,嗯写就是了。这些可能之后介绍的时候会讲,先就跳过吧。我们直接进入正题:Parquet 的 arrow 官方实现。

这一版本实现有一些须知:

  1. 数据来源是 arrow (通常是 arrow::Array),schema 也来自 arrow。
  2. parquet 官方的 thrift 在:https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift 。arrow 包装了一层访问层,位于 src/parquet/metadata.h。内部实现放在 parquet/thrift_internal.h
  3. 对文件的访问包含在了 src/parquet/platform.h,其中,::arrow::io::InputStream 是对输入流的抽象、::arrow::io::RandomAccessFile 是对输入文件的抽象。关于这些,相关的内容实现在 arrow/io 下,部分实现者比如 mmap 文件、hdfs 文件。而 arrow 还有一套文件系统的抽象,比如 arrow/filesystem,实现了 s3local 的文件系统抽象(不知道为啥 HadoopFileSystem 在目前版本没有放到 filesystem 下)。
  4. 输出被包装在 ::arrow::io::BufferOutputStream 中,这个也支持 mmap, s3, hdfs 等
  5. 加密、默认值、配置值相关的配置在 properties.h 里面

上面都是配置的逻辑。arrow 的代码实现大量采用了 pimpl 的逻辑,有一堆:

1
2
3
4
5
class XXX {
class Contents {...};
private:
std::unique_ptr<Contents> contents_;
};

上面这样的 sb 逻辑

Arrow cheatsheet

虽然我们主题不是介绍 Arrow,不过可以简单介绍一下 arrow 的类型系统,提供一些简单的 cheatsheet。具体对 arrow 介绍估计也不多。

Array

Array 是其中最基本的类型,大概是「列式数组」的概念,支持 null、嵌套、变长、数组格式等。另外,可以从代码层面注意一下,这里的字符串 Buffer 可能并不需要 owned,但是需要是连续的,几个部分需要在同一个 buffer 里,之间没有 padding。数据因为需要 $O(1)$ 找到位置,所以是 null 的地方会是一个 undefined 的值,但是要占着内存。

下面的内容有两个来源:

  1. 官方的 format,很快可以读完,大概30分钟:https://arrow.apache.org/docs/format/Columnar.html#fixed-size-list-layout
  2. In-Memory Analytics with Apache Arrow 第一章,这里的图片非常好。

D113AFDFF7CD13071A3A407B9CED5CC3

Build Blocks:

  1. Array lengths: 标准建议使用 64 bytes 的 i64,但是 32 bytes (i32) 也符合标准。文章提到如果元素特别多,建议拆分成多个 i32 可以表示的数组
  2. Buffer
    1. 官方建议按照 64 bytes 来对齐 Buffer,这方便使用 SIMD 和开启一些 SIMD 上的优化。目前 AVX512 寄存器宽度是 512 bits。
  3. null count / Validity bitmaps: null count 表示元素是 null 的个数,是 0 的时候可以不需要 validity bitmaps。如果非 0,可能有一个 64 bytes padding 的 bitmap。bitmap 按照 LSB 定义,大于 64B 的内容

具体例子可以看看下图:

83EE405A-49D0-4A4A-89E7-9BB222B2505E

3139E767-4789-4593-B147-B2618FE224F7

需要特别注意的是一些有「嵌套」性质的结构,举例子:List<T>, List<List<T>>Struct

嵌套逻辑中,我们需要小小的关注一下「父级是 null」是怎么处理的,在 Array 中,这个靠父级别 null 来解决。在 struct 中,这个靠父级别 null — 子级别 null 来区分。可以看到,这个比 dremel 那种还是简单了很多的。

C865616E-E6CF-47FD-8440-A0F5E5EF8F47

0A39F9FA-54F7-4E5A-8FBC-FAD7AB5FE42B

额外需要提的是字典,它们会有指向字典的逻辑:

49EB35E6-7864-49CA-B433-FB985F598909

Rep Level and Def Levels

理论讲了还是没意思,看代码吧

代码入口:

这是一个 Level Builder,外层的使用入口在:

  1. ArrowColumnWriterV2::Make,创建对应的 MultipathLevelBuilder. ArrowColumnWriterV2 是某个具体的列的 writer。这个地方,Writer 会分 chunk 拿到不同的 Array,构建对应的 MultipathLevelBuilder
  2. MultipathLevelBuilder 写入会返回一个 MultipathLevelBuilderResult,上层根据这个拿到 rep-leveldef-level,然后写入

下面简单看一下这些接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/// \brief Result for a single leaf array when running the builder on the
/// its root.
struct MultipathLevelBuilderResult {
/// \brief The Array containing only the values to write (after all nesting has
/// been processed.
///
/// No additional processing is done on this array (it is copied as is when
/// visited via a DFS).
///
/// leaf_array 是写入页面的整个 array, 即整列的数据
std::shared_ptr<::arrow::Array> leaf_array;

/// \brief Might be null.
const int16_t* def_levels = nullptr;

/// \brief Might be null.
const int16_t* rep_levels = nullptr;

/// \brief Number of items (int16_t) contained in def/rep_levels when present.
/// def-level 和 rep-level 长度是一样的, 所以需要一个 level 来提示
int64_t def_rep_level_count = 0;

/// \brief Contains element ranges of the required visiting on the
/// descendants of the final list ancestor for any leaf node.
///
/// The algorithm will attempt to consolidate visited ranges into
/// the smallest number possible.
///
/// This data is necessary to pass along because after producing
/// def-rep levels for each leaf array it is impossible to determine
/// which values have to be sent to parquet when a null list value
/// in a nullable ListArray is non-empty.
///
/// This allows for the parquet writing to determine which values ultimately
/// needs to be written.
///
/// 写入的内容在 `leaf_array` 中的范围
std::vector<ElementRange> post_list_visited_elements;

/// Whether the leaf array is nullable.
///
/// leaf 是否是 nullable 的
bool leaf_is_nullable;
};

然后用 Write 接口去走 callback,来往 writer 写入数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
int64_t size) override {
if (arrow_properties_->engine_version() == ArrowWriterProperties::V2 ||
arrow_properties_->engine_version() == ArrowWriterProperties::V1) {
ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_,
row_group_writer_));
return writer->Write(&column_write_context_);
}
return Status::NotImplemented("Unknown engine version.");
}

// Writes out all leaf parquet columns to the RowGroupWriter that this
// object was constructed with. Each leaf column is written fully before
// the next column is written (i.e. no buffering is assumed).
//
// Columns are written in DFS order.
Status Write(ArrowWriteContext* ctx) {
for (int leaf_idx = 0; leaf_idx < leaf_count_; leaf_idx++) {
ColumnWriter* column_writer;
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
for (auto& level_builder : level_builders_) {
RETURN_NOT_OK(level_builder->Write(
leaf_idx, ctx, [&](const MultipathLevelBuilderResult& result) {
size_t visited_component_size = result.post_list_visited_elements.size();
DCHECK_GT(visited_component_size, 0);
if (visited_component_size != 1) {
return Status::NotImplemented(
"Lists with non-zero length null components are not supported");
}
const ElementRange& range = result.post_list_visited_elements[0];
std::shared_ptr<Array> values_array =
result.leaf_array->Slice(range.start, range.Size());

return column_writer->WriteArrow(result.def_levels, result.rep_levels,
result.def_rep_level_count, *values_array,
ctx, result.leaf_is_nullable);
}));
}

PARQUET_CATCH_NOT_OK(column_writer->Close());
}
return Status::OK();
}

我们可以看到,这里走了 MultipathLevelBuilder::Write,然后里面走了回调,回调函数的参数就是之前提到的 MultipathLevelBuilderResult。这个函数给没有 rep-level 和 def-level 的输入加上了这两个 level,之后 ColumnWriter 就可以拿到 def-level, rep-level 和值去写了。那么,这里的关键应该就是 rep-level 和 def-level 是怎么构建的了。

这部分关键的内容在 parquet/arrow/path_internal.cc 下面。我们简单介绍一下对应的算法:

  1. 输入是对应的 arrow Array 和相关的 schema
  2. dfs 的方式推断出,「这个地方最大的 def-level 是什么,rep-level 是什么」
  3. 拿到用户的输入 array,进行计算

在步骤 (2),arrow 会根据输入数据构建出树,有两种节点:

  1. ListNode / StructNode / MapNode: 有子节点且非终止的节点
  2. TerminalNode: 终止的节点。这里包括 AllNullsTerminalNode(所有成员都是 null)、AllPresentTerminalNode(全都有,且父级别没有 null 的节点),NullableTerminalNode 可能有 null 但不去为 null

一般来说,可以理解 TerminalNode 为叶子结点,而其它节点为非叶子结点,但是也有一些特殊情况,比如父级就发现全部是 null 了,这里也可能是个 TerminalNode

每个 Node 有一个 Run 方法,作为 (3) 需要的函数,但是参数并不相同,Node 之间也不是继承关系,而是组合关系,由 variant 包装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Contains static information derived from traversing the schema.
struct PathInfo {
// The vectors are expected to the same length info.

// Note index order matters here.
using Node =
std::variant<NullableTerminalNode, ListNode, LargeListNode, FixedSizeListNode,
NullableNode, AllPresentTerminalNode, AllNullsTerminalNode>;

std::vector<Node> path;
std::shared_ptr<Array> primitive_array;
int16_t max_def_level = 0;
int16_t max_rep_level = 0;
bool has_dictionary = false;
bool leaf_is_nullable = false;
};

我们可以看看上面的 PathInfo,这里的目标就是构建一个这样的 PathInfo.

PathBuilder: 构建 Node 阶段

1
2
3
4
5
6
7
8
9
// static
::arrow::Result<std::unique_ptr<MultipathLevelBuilder>> MultipathLevelBuilder::Make(
const ::arrow::Array& array, bool array_field_nullable) {
auto constructor = std::make_unique<PathBuilder>(array_field_nullable);
// 递归分发给 PathBuilder 访问 arrow array, 捞到对应的 schema. 这里 leaf count 可能不止 1.
RETURN_NOT_OK(VisitArrayInline(array, constructor.get()));
return std::make_unique<MultipathLevelBuilderImpl>(array.data(),
std::move(constructor));
}

这里可以看到,这里创建了一个 PathBuilder,然后访问了 VisitArrayInline,这里相当于自己定义了一个访问 array 的方法

PathBuilder 提供了 Visit(不同的 Array) 的方法,能够完成动态分发。不过进入具体访问方式前,先来看看它的成员:

1
2
3
4
5
6
7
8
class PathBuilder {
public:
explicit PathBuilder(bool start_nullable) : nullable_in_parent_(start_nullable) {}
private:
PathInfo info_;
std::vector<PathInfo> paths_;
bool nullable_in_parent_;
};

nullable_in_parent_ 这个是表示,任何父亲的这个字段是否可能是 null。在代码里,丢给 PathBuilder 这玩意是这样的:

1
2
3
4
bool nullable_root = HasNullableRoot(schema_manifest, schema_field);
if (leaf_offset == 0) {
is_nullable = nullable_root;
}

这个看上去是 HasNullableRoot,但是丢给 path 的一般都是 root 的直接儿子,所以这里表示的就是 parent 的这个字段可能不可能是 null。

处理父级别 null

有一个很常见的函数是 MaybeAddNullable,我们来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 添加对应的 def-level,
void MaybeAddNullable(const Array& array) {
if (!nullable_in_parent_) {
return;
}
info_.max_def_level++;
// We don't use null_count() because if the null_count isn't known
// and the array does in fact contain nulls, we will end up
// traversing the null bitmap twice (once here and once when calculating
// rep/def levels). Because this isn't terminal this might not be
// the right decision for structs that share the same nullable
// parents.
if (LazyNoNulls(array)) {
// Don't add anything because there won't be any point checking
// null values for the array. There will always be at least
// one more array to handle nullability.
return;
}
// 反正都没了,不如 terminal 了
if (LazyNullCount(array) == array.length()) {
info_.path.emplace_back(AllNullsTerminalNode(info_.max_def_level - 1));
return;
}
info_.path.emplace_back(
NullableNode(array.null_bitmap_data(), array.offset(),
/* def_level_if_null = */ info_.max_def_level - 1));
}

这里如果是父级 null,就会增加 max_def_level,然后添加对应的节点项。

访问 struct

1
2
3
4
5
6
7
8
9
10
Status Visit(const ::arrow::StructArray& array) {
MaybeAddNullable(array);
PathInfo info_backup = info_;
for (int x = 0; x < array.num_fields(); x++) {
nullable_in_parent_ = array.type()->field(x)->nullable();
RETURN_NOT_OK(VisitInline(*array.field(x)));
info_ = info_backup;
}
return Status::OK();
}

这里代码应该非常好理解,就是递归访问。

访问 ListArray

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template <typename T>
::arrow::enable_if_t<std::is_same<::arrow::ListArray, T>::value ||
std::is_same<::arrow::LargeListArray, T>::value,
Status>
Visit(const T& array) {
// 处理父级别带来的 def, 和自身场景
// 对于 parquet 的 list, 这里相当于两组标记:
// List {
// repeatable member
// }
// 这两层都是 rep/def 哦~
MaybeAddNullable(array);

// Increment necessary due to empty lists.
info_.max_def_level++;
info_.max_rep_level++;
// raw_value_offsets() accounts for any slice offset.
ListPathNode<VarRangeSelector<typename T::offset_type>> node(
VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
info_.max_rep_level, info_.max_def_level - 1);
info_.path.emplace_back(std::move(node));
nullable_in_parent_ = array.list_type()->value_field()->nullable();
return VisitInline(*array.values());
}

这里 max_rep_levelmax_def_level - 1 得回顾一下上一篇博客了:

1
2
3
4
5
6
7
8
9
template <typename RangeSelector>
class ListPathNode {
public:
ListPathNode(RangeSelector selector, int16_t rep_lev, int16_t def_level_if_empty)
: selector_(std::move(selector)),
prev_rep_level_(rep_lev - 1),
rep_level_(rep_lev),
def_level_if_empty_(def_level_if_empty) {}
};

VarRangeSelector 则持有了 array 元素的 offsets,还记得前面的 ListArray 的图吗?

访问叶子结点

1
2
3
4
5
6
7
8
//! 是平坦的 array, 可以当作找到了某个叶子.
template <typename T>
::arrow::enable_if_t<std::is_base_of<::arrow::FlatArray, T>::value, Status> Visit(
const T& array) {
// 在叶子层面加上 terminal info.
AddTerminalInfo(array);
return Status::OK();
}

这里的重点是 AddTerminalInfo,添加了对应的 TerminalNode:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename T>
void AddTerminalInfo(const T& array) {
// 这里, leaf_is_nullable 经由 parent 实现, 表示「父级别的这个字段可能是 nullable」的。
info_.leaf_is_nullable = nullable_in_parent_;
if (nullable_in_parent_) {
info_.max_def_level++; // (加上)这一层的 def-level
}

// 注意: rep-level 在底层是不会处理的,都是上层处理的。

// We don't use null_count() because if the null_count isn't known
// and the array does in fact contain nulls, we will end up
// traversing the null bitmap twice (once here and once when calculating
// rep/def levels).
//
// 处理: 没有 null, 都是 null, 可以为 null 的 case. 把这个加入 info_.path 中.
if (LazyNoNulls(array)) {
info_.path.emplace_back(AllPresentTerminalNode{info_.max_def_level});
} else if (LazyNullCount(array) == array.length()) {
info_.path.emplace_back(AllNullsTerminalNode(info_.max_def_level - 1));
} else {
info_.path.emplace_back(NullableTerminalNode(array.null_bitmap_data(),
array.offset(), info_.max_def_level));
}
info_.primitive_array = std::make_shared<T>(array.data());
// 真正处理这个 dfs path, 把 fixup 的结果加入 paths. 这里面会处理 rep-level.
paths_.push_back(Fixup(info_));
}

这个 Fixup 是处理 rep-level 的函数,我们再看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// FIXUP 会处理 rep-level.
PathInfo Fixup(PathInfo info) {
// We only need to fixup the path if there were repeated
// elements on it.
if (info.max_rep_level == 0) {
return info;
}
FixupVisitor visitor;
visitor.max_rep_level = info.max_rep_level;
if (visitor.max_rep_level > 0) {
visitor.rep_level_if_null = 0;
}
// 从 0-最后的顺序访问, 变更 visitor, 这个地方会把最上层的 rep-level 带下来.
for (size_t x = 0; x < info.path.size(); x++) {
std::visit(visitor, info.path[x]);
}
return info;
}

struct FixupVisitor {
int max_rep_level = -1;
int16_t rep_level_if_null = kLevelNotSet; // (第一个) 为 null 的时候, 需要处理 rep-level 的.

// 遇到了中间的会找到 rep-level
template <typename T>
void HandleListNode(T& arg) {
if (arg.rep_level() == max_rep_level) {
arg.SetLast();
// after the last list node we don't need to fill
// rep levels on null.
rep_level_if_null = kLevelNotSet;
} else {
rep_level_if_null = arg.rep_level();
}
}
void operator()(ListNode& node) { HandleListNode(node); }
void operator()(LargeListNode& node) { HandleListNode(node); }
void operator()(FixedSizeListNode& node) { HandleListNode(node); }

// For non-list intermediate nodes.
template <typename T>
void HandleIntermediateNode(T& arg) {
if (rep_level_if_null != kLevelNotSet) {
// 只有 AllNullsTerminalNode 和 NullableTerminalNode 可能有这个
arg.SetRepLevelIfNull(rep_level_if_null); // 如果这层是空的时候,rep-level 是什么,就是找到一个父级的 rep-level
}
}

void operator()(NullableNode& arg) { HandleIntermediateNode(arg); }

void operator()(AllNullsTerminalNode& arg) {
// Even though no processing happens past this point we
// still need to adjust it if a list occurred after an
// all null array.
HandleIntermediateNode(arg);
}

void operator()(NullableTerminalNode&) {}
void operator()(AllPresentTerminalNode&) {}
};

这个 visitor 会把 rep-level-if-null 给带下去。

Write 阶段

Write 阶段核心函数在 WritePath 上:

1
2
3
4
5
6
7
::arrow::Status Write(int leaf_index, ArrowWriteContext* context,
CallbackFunction write_leaf_callback) override {
DCHECK_GE(leaf_index, 0);
DCHECK_LT(leaf_index, GetLeafCount());
return WritePath(root_range_, &path_builder_->paths()[leaf_index], context,
std::move(write_leaf_callback));
}

WritePath 比较复杂,我们看之前先介绍一下算法:

WritePath 的算法

现在我们有了一组 node 栈,然后我们假设一个场景:List<List<int>>[[1, 2, null], null, [], [null, 1, 2]]。这个本身不难,但是这个 def 和 rep 都是会变动的. 这个在 arrow 中大概会被实现成:

1
2
3
ListArray: [[]], valid bitmap, offsets
ListArray: [], valid bitmap, offsets
IntArray: buffer, valid bitmap, offsets

这里会构造一个 range 栈,大概是这样的:

  1. 拿到根结点
  2. Nullable -> List -> Nullable -> List -> Int ,写入 1,2,null 和对应的 rep-level , def-level
  3. 回溯到上层,发现成员是 null,写入一个对应的 rep-level , def-level
  4. 发现栈有元素,到下层继续写入

具体实现

这里需要介绍 Iterator 类型,每一层会有一个 Run,拿到对应的范围。有可能:

  1. kDone 做完了,应该向上层走
  2. kNext 应该走进下一层
1
2
3
4
5
6
7
8
9
10
/// \brief Simple result of a iterating over a column to determine values.
enum IterationResult {
/// Processing is done at this node. Move back up the path
/// to continue processing.
kDone = -1,
/// Move down towards the leaf for processing.
kNext = 1,
/// An error occurred while processing.
kError = 2
};

我们以 nullable 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
IterationResult Run(ElementRange* range, ElementRange* child_range,
PathWriteContext* context) {
if (new_range_) {
// Reset the reader each time we are starting fresh on a range.
// We can't rely on continuity because nulls above can
// cause discontinuities.
valid_bits_reader_ = MakeReader(*range);
}
child_range->start = range->start;
::arrow::internal::BitRun run = valid_bits_reader_.NextRun();
if (!run.set) {
range->start += run.length;
RETURN_IF_ERROR(FillRepLevels(run.length, rep_level_if_null_, context));
RETURN_IF_ERROR(context->AppendDefLevels(run.length, def_level_if_null_));
run = valid_bits_reader_.NextRun();
}
if (range->Empty()) {
new_range_ = true;
return kDone;
}
child_range->end = child_range->start = range->start;
child_range->end += run.length;

DCHECK(!child_range->Empty());
range->start += child_range->Size();
new_range_ = false;
return kNext;
}

这里会按情况添加 level,然后返回上一层或者暗示后面还有,走向下一层,这些 node 都是 有状态的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/// Contains logic for writing a single leaf node to parquet.
/// This tracks the path from root to leaf.
///
/// |writer| will be called after all of the definition/repetition
/// values have been calculated for root_range with the calculated
/// values. It is intended to abstract the complexity of writing
/// the levels and values to parquet.
Status WritePath(ElementRange root_range, PathInfo* path_info,
ArrowWriteContext* arrow_context,
MultipathLevelBuilder::CallbackFunction writer) {
// 栈表示为 range 栈
std::vector<ElementRange> stack(path_info->path.size());
MultipathLevelBuilderResult builder_result;
builder_result.leaf_array = path_info->primitive_array;
builder_result.leaf_is_nullable = path_info->leaf_is_nullable;

// 不需要处理递归、特殊场景,嗯写就是了
if (path_info->max_def_level == 0) {
// This case only occurs when there are no nullable or repeated
// columns in the path from the root to leaf.
int64_t leaf_length = builder_result.leaf_array->length();
builder_result.def_rep_level_count = leaf_length;
builder_result.post_list_visited_elements.push_back({0, leaf_length});
return writer(builder_result);
}
// root_range 为行的 offset, 是最外层的 range
stack[0] = root_range;
RETURN_NOT_OK(
arrow_context->def_levels_buffer->Resize(/*new_size=*/0, /*shrink_to_fit*/ false));
PathWriteContext context(arrow_context->memory_pool, arrow_context->def_levels_buffer);
// We should need at least this many entries so reserve the space ahead of time.
RETURN_NOT_OK(context.def_levels.Reserve(root_range.Size()));
if (path_info->max_rep_level > 0) {
RETURN_NOT_OK(context.rep_levels.Reserve(root_range.Size()));
}

auto stack_base = &stack[0];
auto stack_position = stack_base;
// This is the main loop for calculated rep/def levels. The nodes
// in the path implement a chain-of-responsibility like pattern
// where each node can add some number of repetition/definition
// levels to PathWriteContext and also delegate to the next node
// in the path to add values. The values are added through each Run(...)
// call and the choice to delegate to the next node (or return to the
// previous node) is communicated by the return value of Run(...).
// The loop terminates after the first node indicates all values in
// |root_range| are processed.
while (stack_position >= stack_base) {
PathInfo::Node& node = path_info->path[stack_position - stack_base];
struct {
IterationResult operator()(NullableNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(ListNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(NullableTerminalNode& node) {
return node.Run(*stack_position, context);
}
IterationResult operator()(FixedSizeListNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(AllPresentTerminalNode& node) {
return node.Run(*stack_position, context);
}
IterationResult operator()(AllNullsTerminalNode& node) {
return node.Run(*stack_position, context);
}
IterationResult operator()(LargeListNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
ElementRange* stack_position;
PathWriteContext* context;
} visitor = {stack_position, &context};

IterationResult result = std::visit(visitor, node);

if (ARROW_PREDICT_FALSE(result == kError)) {
DCHECK(!context.last_status.ok());
return context.last_status;
}
stack_position += static_cast<int>(result);
}
RETURN_NOT_OK(context.last_status);
builder_result.def_rep_level_count = context.def_levels.length();

if (context.rep_levels.length() > 0) {
// This case only occurs when there was a repeated element that needs to be
// processed.
builder_result.rep_levels = context.rep_levels.data();
std::swap(builder_result.post_list_visited_elements, context.visited_elements);
// If it is possible when processing lists that all lists where empty. In this
// case no elements would have been added to post_list_visited_elements. By
// added an empty element we avoid special casing in downstream consumers.
if (builder_result.post_list_visited_elements.empty()) {
builder_result.post_list_visited_elements.push_back({0, 0});
}
} else {
builder_result.post_list_visited_elements.push_back(
{0, builder_result.leaf_array->length()});
builder_result.rep_levels = nullptr;
}

builder_result.def_levels = context.def_levels.data();
return writer(builder_result);
}

Samples for write

总觉得我写的还是比较抽象的,所以这里尝试举几个例子,来简单介绍一下,因为之前写入的过于抽象了。

我们回顾一下:

  1. PathBuilder 会根据用户的 Schema 来构建 node,这些 node 有不同的 max-rep-level 和 max-def-level,也有空缺的时候应该写入的 def-level。这里会构成一个栈
  2. 传递数据下来的时候,连续的数据会找到对应的栈,写对应的 def-level 和 rep-level

我们下面列举的例子都是写入 1024 行数据的,我们会从简单到难来介绍写入 case 的例子

Case 1: i32

我们假设有连续的 i32, 它不是 nullable 的,这个时候,arrow 对应的结构是 Fixed 的 int32 数组,没有任何 valid bitmap。

这个时候,PathBuilder在构建阶段 会构建一个下面的节点:

流程图 (1)

在构建阶段,程序会找到这个 node,然后设置迭代的 range = [0, 1024),然后遍历这个栈…

不,其实它不会遍历这个栈!还记得之前吗?如果字段是 optional,那么 max-dep + 1,如果是 repeatable, 那么 max-dep + 1, max-rep + 1。也就是说,程序如果检查到 max-dep == 0,那么说明对象都是一对一且无 null 的,这个地方就直接写入所有值了!

Case 2: optional i32

我们假设有连续的 optional i32,这个时候,arrow 对应的结构是 Fixed 的 int32 数组,没有任何 valid bitmap。

这个时候,PathBuilder在构建阶段 会构建一个下面的节点:

流程图 (2)

在构建阶段,程序会找到这个 node,然后设置迭代的 range = [0, 1024),然后遍历这个栈,把 [0, 1024) 传给 node,这里访问 validation bitset,如果有元素,就在 def-levels 里面加入 1,否则加入 0。栈只有一层,一次解决 0-1024 所有元素。这里不存在任何 rep-level。

特殊情况下,如果发现数据全是 Null,会构建一个特殊的节点:

parquet-c2-1

这里会写入全是 0 的 def-level,然后不写入任何 rep.

Case 3: array<i32, 4>

我们假设 list 不是 nullable 的,这个时候会怎么写入呢?我们假设可以有这样的成员:

1
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]

这个对应 arrow 中的 FixedSizeListArray,对应内容如下:

parquet-c3-0

这是一个最简化的模型了,实际上可能会复杂很多,我们先不管。这里输入是 range = [0, 3),然后数组对应是 Int32Array = [0, 1, 2, 3, 4, 5, ... 11]

  1. 第一层栈对应范围是 [0, 3)
  2. 第一层栈找到第一个 非空 的,是第 0 个,这个时候,如果是第一次访问(开一个新列表),很显然,rep-level 会来自自己的父级。第一个元素写入 prev-rep-level,即 0.
    1. 因为这个节点是 last_list_node,所以它会向后找到第一个非 empty 的 node,会在 FillForLast 函数找到所有对应的内容,把 rep-level 填充完毕,每遇到非空的内容,都给四个填充 [0, 1, 1, 1] 等。为什么是 [0, 1, 1, 1] 呢?第一个元素需要填充 prev-rep-level,其次要填充 rep-level
  3. 完毕后,这里要把 [0, 3) 转化为下层需要的范围,FixSizeListNode 对应每个子元素 size 都是 4,所以这里会拿到 [0, 12) ,作为对应的 range
  4. 栈开始处理 AllPresentTerminalNode,丢给下面的范围是 [0, 12),这里都不是 null 的,就会写入具体的内容,写完具体 12 个值和对应的 def-level,即 12个 1

我们回过头看下 (2),这里:

  1. 一旦新开了一个 list,一定写一个自己的 rep-level
  2. 如果 list 是最后一个 list ( 最后可能带来 rep-level 的地方),它会负责写所有儿子的 rep-level

还有,非空 和 非 null 是不同的概念,null 会有别的地方来处理,我们之后会介绍。

Case 3-1: array<optional i32, 4>

我们假设可以有这样的成员:

1
[[0, null, 2, 3], [4, null, 6, 7], [8, null, 10, 11]]

这下我们需要改变我们的树了!

parquet-c3-1

这里 1-3 步和之前一样,4会根据节点写出自己对应的 def-level 是 1 还是 2. 类似 case 2 的场景

Case: List<int32>

我们前面介绍了 array<T, 4>,我在画图的时候,把 node 当作 Fixed Sized 的 List,那么实际的 List 或者 repeated 会是什么样的呢?

这里我们可以看看对应的类型系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// List nodes handle populating rep_level for Arrow Lists and def-level for empty lists.
// Nullability (both list and children) is handled by other Nodes. By
// construction all list nodes will be intermediate nodes (they will always be followed by
// at least one other node).
//
// Type parameters:
// |RangeSelector| - A strategy for determine the the range of the child node to
// process.
// this varies depending on the type of list (int32_t* offsets, int64_t* offsets of
// fixed.
template <typename RangeSelector>
class ListPathNode;

using ListNode = ListPathNode<VarRangeSelector<int32_t>>;
using LargeListNode = ListPathNode<VarRangeSelector<int64_t>>;
using FixedSizeListNode = ListPathNode<FixedSizedRangeSelector>;

这部分实现实际上和 Parquet 关系不大,Parquet 只会知道你是 repeated,这部分是 arrow 的逻辑,回顾一下上面的图:

  1. Fixed Sized 会在元信息里面存放自己的 size
  2. 非 fix sized 会有个 offset 数组,用这个 offset 数组来推断自己的 size

那么,我们回顾 3 的 case,看看 List 会怎么样:

1
[[0, null, 2], [], [8, null, 10, 11]]

这里会捞到 [0, null, 2],写入 rep-level [0, 1, 1]

遇到 [] 的时候,它会跳过这个,然后写入一个 rep-level 和空缺的 def-level

[8, null, 10, 11] 会写入 rep-level [0, 1, 1, 1]

Case 4: optional array<optional i32, 4>

我们假设可以有这样的成员:

1
[[0, null, 2, 3], null, [8, null, 10, 11]]

这下我们又需要改变我们的树了!

parquet-c4-0

这里,NullableNode 会读取对应的 validation bitset, 然后返回给下层:

  1. 根节点 range 是 [0, 3)
  2. 第一次发现不是 null 的,连续的返回 [0, 1) 范围给下层,下层和之前的逻辑一样处理
  3. 第二次发现 null,这里需要填充 rep-level 和 def-level,分别填充两个 0 上去
  4. 发现为 null,再次令其填充