Parquet C++: from schema to schema

这节我们讨论一个重要但是 trivial 的话题,Schema。这个话题在处理类型的时候额外重要,用户需要选择类型,来构建合理的抽象,而这里有几个问题:

  • (这里我们讨论 Primitive Types) https://github.com/apache/parquet-format/blob/master/LogicalTypes.md Parquet 物理类型给的并不多,而 LogicalType 支持不是完全的支持。这里面临一个类型映射的关系
  • 每个类型的 Order 之类的都是不一样的,Order 会影响对应 Statistics 的生成
  • 用户的类型、Statistics 不一定等于 Parquet 内部的 Statistics,这个有点反直觉,最典型的场景是 count 和 null。此外还要考虑很多 Sort Order 的东西
  • 外头的数据写进 Parquet 也需要有一个合理的 Order Mapping 和对应的 Mapping,来完成高效的写

这整个链路并不复杂,但是很繁琐。但是如果不理解这一块,必然会碰到问题。

Arrow Schema

arrow schema 路径在 src/arrow/type.h。它是一个很干净的 arrow schema. 在内存中是一个树形结构

关于 fieldId, 这个需要用户手动进行编码:

The parquet format supports an optional integer field_id which can be assigned to a field. Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.

Parquet Schema

parquet schema 在内存中是一个树形的层级关系,写到磁盘上会以 dfs 的形式 flatten. 在标准中(见系列 1)有比较详细的描述。这里简单回顾一下存储中的 schema: https://blog.mwish.me/2022/09/18/Parquet-Part1-Basic/#%E7%B1%BB%E5%9E%8B%E7%B3%BB%E7%BB%9F%E5%92%8C%E9%80%BB%E8%BE%91%E7%B1%BB%E5%9E%8B

  1. SchemaElement 树的叶子结点,包括对应的类型(Logical, Physical)和一些奇怪的字段(FLBA 长度、浮点数精度)
  2. FileMetaData 中包含 list<SchemaElement> schema

上面是存储的。内存结构对应路径在 src/parquet/schema.h. 简单介绍一下,这块逻辑如下:

  • ColumnPath: Schema 中某个路径。实现上是一个 vector<string>, 可以以 dot-path 的形式取路径 (root.struct.field)
  • Node: 单个节点的 base-class,需要注意的是,这个 Node 是 Parquet 中的 Node,而不完全等于用户的 schema
    • Node 有 Primitive 和 Group 两种类型,可以理解 Group 是带崽的
    • 包含 RepititionType、field_id,(可能存在的)父亲节点,本节点的 name、LogicalType (这里还实现了 ConvertType,恶心啊)
  • PrimitiveNode: 包含 PhysicalType,自然也就包含 SchemaElement 中 那些奇怪的字段 和 ColumnOrder
  • GroupNode: 在 Parquet 内部的 map,相当于只有 LogicalType (只有 PrimitiveNode 有 Physical Type),这层没有 fieldId 的概念

上面是 Node 级别,下面是 Column (Physical) 级别的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// The ColumnDescriptor encapsulates information necessary to interpret
// primitive column data in the context of a particular schema. We have to
// examine the node structure of a column's path to the root in the schema tree
// to be able to reassemble the nested structure from the repetition and
// definition levels.
class PARQUET_EXPORT ColumnDescriptor {
public:
ColumnDescriptor(schema::NodePtr node, int16_t max_definition_level,
int16_t max_repetition_level,
const SchemaDescriptor* schema_descr = NULLPTR);
private:
schema::NodePtr node_;
const schema::PrimitiveNode* primitive_node_;

int16_t max_definition_level_;
int16_t max_repetition_level_;
};

和对应的 SchemaDescriptor,这里是 Parquet Schema 总的 Container,也就是 Parquet 内部 Schema 的核心了。我们需要知道:

  1. RootNode 一定是个 struct,但某种意义上,它的 field-id 其实无所谓
  2. RootNode 的第几个字段标志这个 Schema 长什么鬼样
  3. 可以通过 Column(idx) 取到 物理上的 Column

好了,这样你就可以从 SchemaDescriptor 拿到对应的东西了,顺带说一句,SchemaDescriptor 结构很简单好玩,我一定得贴出来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

private:
friend class ColumnDescriptor;

// Root Node
schema::NodePtr schema_;
// Root Node
const schema::GroupNode* group_node_;

void BuildTree(const schema::NodePtr& node, int16_t max_def_level,
int16_t max_rep_level, const schema::NodePtr& base);

// Result of leaf node / tree analysis
std::vector<ColumnDescriptor> leaves_;

std::unordered_map<const schema::PrimitiveNode*, int> node_to_leaf_index_;

// Mapping between leaf nodes and root group of leaf (first node
// below the schema's root group)
//
// For example, the leaf `a.b.c.d` would have a link back to `a`
//
// -- a <------
// -- -- b |
// -- -- -- c |
// -- -- -- -- d
std::unordered_map<int, schema::NodePtr> leaf_to_base_;

// Mapping between ColumnPath DotString to the leaf index
std::unordered_multimap<std::string, int> leaf_to_idx_;
};

Serialize: 转化为 Thrift

上面说的内存结构和存储结构肯定是能够互相转换的,见:

1
2
3
4
void ToParquet(const GroupNode* schema, std::vector<format::SchemaElement>* out) {
SchemaVisitor visitor(out);
schema->VisitConst(&visitor);
}

这里会强制使用 DFS 顺序递归的去 Visit,每次 push_back 一个 Schema,正好和 FileMetaDatalist 的顺序吻合

Arrow-Parquet Bridge

你或许会觉得,讲完了 Parquet 和 Arrow schema 就完结了。但并没有。Parquet Schema 格式是一个树形的格式,而且可能不等价于 Arrow 的格式。这句话说的有点难理解,我们直接看例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// List encodings: using the terminology from Impala to define different styles
// of representing logical lists (a.k.a. ARRAY types) in Parquet schemas. Since
// the converted type named in the Parquet metadata is ConvertedType::LIST we
// use that terminology here. It also helps distinguish from the *_ARRAY
// primitive types.
//
// One-level encoding: Only allows required lists with required cells
// repeated value_type name
//
// Two-level encoding: Enables optional lists with only required cells
// <required/optional> group list
// repeated value_type item
//
// Three-level encoding: Enables optional lists with optional cells
// <required/optional> group bag
// repeated group list
// <required/optional> value_type item
//
// 2- and 1-level encoding are respectively equivalent to 3-level encoding with
// the non-repeated nodes set to required.
//
// The "official" encoding recommended in the Parquet spec is the 3-level, and
// we use that as the default when creating list types. For semantic completeness
// we allow the other two. Since all types of encodings will occur "in the
// wild" we need to be able to interpret the associated definition levels in
// the context of the actual encoding used in the file.
//
// NB: Some Parquet writers may not set ConvertedType::LIST on the repeated
// SchemaElement, which could make things challenging if we are trying to infer
// that a sequence of nodes semantically represents an array according to one
// of these encodings (versus a struct containing an array). We should refuse
// the temptation to guess, as they say.
struct ListEncoding {
enum type { ONE_LEVEL, TWO_LEVEL, THREE_LEVEL };
};

上面这个例子并不复杂(只要你不改这块的代码,就很好理解)。但它揭示了一个事实:

  1. 外部的 Schema 和 Parquet Schema 映射关系不一定一样
  2. 一个字段需要分配的 fieldId 可能没那么简单处理

src/parquet/arrow/schema.h 做的就是 arrow-parquet 格式间的 Bridge。它「符合 Parquet 标准,但不是 Parquet 标准的一部分」,但「必须要这个桥梁,才能构建 Parquet」。

我们可以在 testing 找到一个很好的 sample 代码,在我们跳进细节之前,可以简单看看这块的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
::arrow::Status RoundTripSchema(
const std::vector<std::shared_ptr<Field>>& fields,
std::shared_ptr<::parquet::ArrowWriterProperties> arrow_properties =
::parquet::default_arrow_writer_properties()) {
arrow_schema_ = ::arrow::schema(fields);
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::default_writer_properties();
RETURN_NOT_OK(ToParquetSchema(arrow_schema_.get(), *properties.get(),
*arrow_properties, &parquet_schema_));
::parquet::schema::ToParquet(parquet_schema_->group_node(), &parquet_format_schema_);
auto parquet_schema = ::parquet::schema::FromParquet(parquet_format_schema_);
return FromParquetSchema(parquet_schema.get(), &result_schema_);
}

字段级别: SchemaField

1
2
3
4
5
6
7
8
9
10
11
12
/// \brief Bridge between an arrow::Field and parquet column indices.
struct PARQUET_EXPORT SchemaField {
std::shared_ptr<::arrow::Field> field;
std::vector<SchemaField> children;

// Only set for leaf nodes
int column_index = -1;

parquet::internal::LevelInfo level_info;

bool is_leaf() const { return column_index != -1; }
};

很好懂,这里注意:

  1. column_index 标注 leaf 的实际 column
  2. level_info 标注 parquet 对应的 Level

在叶子上会有找到 parquet 的 column_index. 需要注意的是,这个和 arrow 的 Schema 是对应的,但是和 Parquet 的 Schema 不是一一对应的!

这个怎么理解呢,我们先留个关子,可以先理解成这里表示的是「 Parquet 中存在,arrow 中也存在的 Schema」。

SchemaManifest

SchemaManifest 是写入的时候,Parquet 转 arrow 的工具,同时也是读取的时候,arrow 转 Parquet 的工具,但老实说,这个工具并不怎么「显而易见」。使用方法要看代码才能知道是在做什么。这块代码主要在:ArrowColumnWriterV2::Make 这套代码里面。之前我们在这篇博客 Part-2 其实介绍到了这块的流程,但是更关注于内部的 writer 是怎么写入 def-level 和 rep-level 的,今天介绍的内容简单一些,单纯讲几个 Array 到 Parquet Array 的映射关系。

本来我们应该如同前面一样,介绍 SchemaManifest 这个类型,但我们还是先从 User 层面来介绍一些,首先,我们看看一个奇妙的递归 cacl. 这个是根据 Arrow 的类型来做 Calc。我想任何一个大一学生都能写出来…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int CalculateLeafCount(const DataType* type) {
if (type->id() == ::arrow::Type::EXTENSION) {
type = checked_cast<const ExtensionType&>(*type).storage_type().get();
}
// Note num_fields() can be 0 for an empty struct type
if (!::arrow::is_nested(type->id())) {
// Primitive type.
return 1;
}

int num_leaves = 0;
for (const auto& field : type->fields()) {
num_leaves += CalculateLeafCount(field->type().get());
}
return num_leaves;
}

这个地方,创建 SchemaManifest 的时候,会在下列构建 physical-column-to-index 和 node-to-parent 的 mapping。然后 Writer 借助这些映射来找到所有的 Leaf,完成对应的写。

1
2
3
4
5
6
7
8
9
10
11
12
13
struct SchemaTreeContext {
SchemaManifest* manifest;
ArrowReaderProperties properties;
const SchemaDescriptor* schema;

void LinkParent(const SchemaField* child, const SchemaField* parent) {
manifest->child_to_parent[child] = parent;
}

void RecordLeaf(const SchemaField* leaf) {
manifest->column_index_to_field[leaf->column_index] = leaf;
}
};

注意,这里还是没有 FieldId 这个概念的,那么…什么地方有呢?答案很符合直觉:::arrow::Schema.

你肯定要骂娘了,::arrow::Schema 有,你为啥要讲这么多。这你就不对了,你去翻翻 arrow::Schema 的代码,肯定找不到 fieldId,怎么会是呢?记得我最早贴的不:

The parquet format supports an optional integer field_id which can be assigned to a field. Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.

在 arrow schema 下,有一套这样的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/// \defgroup arrow-to-parquet-schema-conversion Functions to convert an Arrow
/// schema into a Parquet schema.
///
/// @{

PARQUET_EXPORT
::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties,
schema::NodePtr* out);

PARQUET_EXPORT
::arrow::Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties,
std::shared_ptr<SchemaDescriptor>* out);

PARQUET_EXPORT
::arrow::Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
const WriterProperties& properties,
std::shared_ptr<SchemaDescriptor>* out);

/// @}

FieldToNode 完成了 arrowField (可能包含嵌套结构) 到 Parquet 的 Node 的转换。这里我们关注两个事情:

  1. FieldId 的填写
  2. 类型转化(尤其是嵌套类型)。

关于 FieldId 填写,内容在最开始:

1
2
3
4
5
6
7
Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties, NodePtr* out) {
std::shared_ptr<const LogicalType> logical_type = LogicalType::None();
ParquetType::type type;
Repetition::type repetition = RepetitionFromNullable(field->nullable());
int field_id = FieldIdFromMetadata(field->metadata());

这个 FieldIdFromMetadata 就是我们一开始提的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static constexpr char FIELD_ID_KEY[] = "PARQUET:field_id";

std::shared_ptr<::arrow::KeyValueMetadata> FieldIdMetadata(int field_id) {
if (field_id >= 0) {
return ::arrow::key_value_metadata({FIELD_ID_KEY}, {ToChars(field_id)});
} else {
return nullptr;
}
}

int FieldIdFromMetadata(
const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
if (!metadata) {
return -1;
}
int key = metadata->FindKey(FIELD_ID_KEY);
if (key < 0) {
return -1;
}
std::string field_id_str = metadata->value(key);
int field_id;
if (::arrow::internal::ParseValue<::arrow::Int32Type>(
field_id_str.c_str(), field_id_str.length(), &field_id)) {
if (field_id < 0) {
// Thrift should convert any negative value to null but normalize to -1 here in case
// we later check this in logic.
return -1;
}
return field_id;
} else {
return -1;
}
}

也就是说,这个会标记在 arrow::FieldKeyValueMetadata 上,在 Schema 上做额外的 KV,然后标注字段类型!

那么,SchemaManifest 怎么恢复呢…答案也有坑,你一定觉得我是说从 SchemaDescriptor 来恢复…实际不完全是的,看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Status SchemaManifest::Make(const SchemaDescriptor* schema,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const ArrowReaderProperties& properties,
SchemaManifest* manifest) {
SchemaTreeContext ctx;
ctx.manifest = manifest;
ctx.properties = properties;
ctx.schema = schema;
const GroupNode& schema_node = *schema->group_node();
manifest->descr = schema;
manifest->schema_fields.resize(schema_node.field_count());

// Try to deserialize original Arrow schema
RETURN_NOT_OK(
GetOriginSchema(metadata, &manifest->schema_metadata, &manifest->origin_schema));
// Ignore original schema if it's not compatible with the Parquet schema
if (manifest->origin_schema != nullptr &&
manifest->origin_schema->num_fields() != schema_node.field_count()) {
manifest->origin_schema = nullptr;
}

for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) {
SchemaField* out_field = &manifest->schema_fields[i];
RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), LevelInfo(), &ctx,
/*parent=*/nullptr, out_field));

// TODO(wesm): as follow up to ARROW-3246, we should really pass the origin
// schema (if any) through all functions in the schema reconstruction, but
// I'm being lazy and just setting dictionary fields at the top level for
// now
if (manifest->origin_schema == nullptr) {
continue;
}

auto origin_field = manifest->origin_schema->field(i);
RETURN_NOT_OK(ApplyOriginalMetadata(*origin_field, out_field));
}
return Status::OK();
}

这个地方会尝试 origin_schema,拿到 FileMetadataorigin_schema: 一个 arrow::Schema 。原来,这个狗屎 arrow 把自己 Schema 额外存了一份,惊喜不惊喜。

这里面的数据按照 arrow::ipc 的格式编码,构成了一个「编码后的 arrow::Schema」(有一种暴露给了用户又没有完全暴露的感觉),然后把 key-value-metadata 中 arrow schema 去掉,再拷贝了一份,就构成了 origin_schema. 当然,如果不包含 origin_schema 对应的 key,这里就什么都不做。

你可能会好奇,arrow::schema 有什么用呢?有了它,parquet 自己的 Schema 就不做数了嘛?

我们需要提前介绍一点:arrow Schema 和 Parquet Schema 的映射关系。Parquet 本身是 Java 社区的人搞出来的,而 arrow 的作者早期都是 Python kernel 那些作者。这两块的类型不一定也不应该一定完全匹配的。

这部分我们还是直接读代码,下面这里分为两部分:

  • 处理 nested type, 这里用了一个 GetFactory 函数,根据 leaf 的 vector,生成一个 nested schema
  • 叶子: 比如,原来 Parquet 只有 String,那么 arrow 下意识转换成 LargeString,这里会做一些简单的类型修正
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
Result<bool> ApplyOriginalStorageMetadata(const Field& origin_field,
SchemaField* inferred) {
bool modified = false;

auto& origin_type = origin_field.type();
auto& inferred_type = inferred->field->type();

const int num_children = inferred_type->num_fields();

if (num_children > 0 && origin_type->num_fields() == num_children) {
DCHECK_EQ(static_cast<int>(inferred->children.size()), num_children);
const auto factory = GetNestedFactory(*origin_type, *inferred_type);
if (factory) {
// The type may be modified (e.g. LargeList) while the children stay the same
modified |= origin_type->id() != inferred_type->id();

// Apply original metadata recursively to children
for (int i = 0; i < inferred_type->num_fields(); ++i) {
ARROW_ASSIGN_OR_RAISE(
const bool child_modified,
ApplyOriginalMetadata(*origin_type->field(i), &inferred->children[i]));
modified |= child_modified;
}
if (modified) {
// Recreate this field using the modified child fields
::arrow::FieldVector modified_children(inferred_type->num_fields());
for (int i = 0; i < inferred_type->num_fields(); ++i) {
modified_children[i] = inferred->children[i].field;
}
inferred->field =
inferred->field->WithType(factory(std::move(modified_children)));
}
}
}

if (origin_type->id() == ::arrow::Type::TIMESTAMP &&
inferred_type->id() == ::arrow::Type::TIMESTAMP) {
// Restore time zone, if any
const auto& ts_type = checked_cast<const ::arrow::TimestampType&>(*inferred_type);
const auto& ts_origin_type =
checked_cast<const ::arrow::TimestampType&>(*origin_type);

// If the data is tz-aware, then set the original time zone, since Parquet
// has no native storage for timezones
if (ts_type.timezone() == "UTC" && !ts_origin_type.timezone().empty()) {
if (ts_type.unit() == ts_origin_type.unit()) {
inferred->field = inferred->field->WithType(origin_type);
} else {
auto ts_type_new = ::arrow::timestamp(ts_type.unit(), ts_origin_type.timezone());
inferred->field = inferred->field->WithType(ts_type_new);
}
}
modified = true;
}

if (origin_type->id() == ::arrow::Type::DURATION &&
inferred_type->id() == ::arrow::Type::INT64) {
// Read back int64 arrays as duration.
inferred->field = inferred->field->WithType(origin_type);
modified = true;
}

if (origin_type->id() == ::arrow::Type::DICTIONARY &&
inferred_type->id() != ::arrow::Type::DICTIONARY &&
IsDictionaryReadSupported(*inferred_type)) {
// Direct dictionary reads are only supported for a couple primitive types,
// so no need to recurse on value types.
const auto& dict_origin_type =
checked_cast<const ::arrow::DictionaryType&>(*origin_type);
inferred->field = inferred->field->WithType(
::arrow::dictionary(::arrow::int32(), inferred_type, dict_origin_type.ordered()));
modified = true;
}

if ((origin_type->id() == ::arrow::Type::LARGE_BINARY &&
inferred_type->id() == ::arrow::Type::BINARY) ||
(origin_type->id() == ::arrow::Type::LARGE_STRING &&
inferred_type->id() == ::arrow::Type::STRING)) {
// Read back binary-like arrays with the intended offset width.
inferred->field = inferred->field->WithType(origin_type);
modified = true;
}

if (origin_type->id() == ::arrow::Type::DECIMAL256 &&
inferred_type->id() == ::arrow::Type::DECIMAL128) {
inferred->field = inferred->field->WithType(origin_type);
modified = true;
}

// Restore field metadata
std::shared_ptr<const KeyValueMetadata> field_metadata = origin_field.metadata();
if (field_metadata != nullptr) {
if (inferred->field->metadata()) {
// Prefer the metadata keys (like field_id) from the current metadata
field_metadata = field_metadata->Merge(*inferred->field->metadata());
}
inferred->field = inferred->field->WithMetadata(field_metadata);
modified = true;
}

return modified;
}

在这里,就会产生带 field-id 的字段。

回到 FieldToNode

我们再次回到 FieldToNode,给这节收尾一下。我们之前说过,FieldToNode 中,arrow::Field 数量可能少于 Parquet 的 Schema。那么这个映射是怎么实现的?缺失字段 fieldId 怎么填?答案是填了个寂寞:

1
2
3
4
5
6
7
8
9
10
11
12
13
Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties, NodePtr* out) {
...
case ArrowTypeId::FIXED_SIZE_LIST:
case ArrowTypeId::LARGE_LIST:
case ArrowTypeId::LIST: {
auto list_type = std::static_pointer_cast<::arrow::BaseListType>(field->type());
return ListToNode(list_type, name, field->nullable(), field_id, properties,
arrow_properties, out);
}
...
}

我们把视角切换到 ListToNode:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Status ListToNode(const std::shared_ptr<::arrow::BaseListType>& type,
const std::string& name, bool nullable, int field_id,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties, NodePtr* out) {
NodePtr element;
std::string value_name =
arrow_properties.compliant_nested_types() ? "element" : type->value_field()->name();
RETURN_NOT_OK(FieldToNode(value_name, type->value_field(), properties, arrow_properties,
&element));

NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
*out = GroupNode::Make(name, RepetitionFromNullable(nullable), {list},
LogicalType::List(), field_id);
return Status::OK();
}

这里插入了一个没 fieldId 的 List,对应 fieldId 为 -1,嘻嘻。

嵌套结构兼容性解析

这部分还是要到 arrow 的转换中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Status NodeToSchemaField(const Node& node, LevelInfo current_levels,
SchemaTreeContext* ctx, const SchemaField* parent,
SchemaField* out) {
// Workhorse function for converting a Parquet schema node to an Arrow
// type. Handles different conventions for nested data.

ctx->LinkParent(out, parent);

// Now, walk the schema and create a ColumnDescriptor for each leaf node
if (node.is_group()) {
// A nested field, but we don't know what kind yet
return GroupToSchemaField(static_cast<const GroupNode&>(node), current_levels, ctx,
parent, out);
} else {
// Either a normal flat primitive type, or a list type encoded with 1-level
// list encoding. Note that the 3-level encoding is the form recommended by
// the parquet specification, but technically we can have either
//
// required/optional $TYPE $FIELD_NAME
//
// or
//
// repeated $TYPE $FIELD_NAME
const auto& primitive_node = static_cast<const PrimitiveNode&>(node);
int column_index = ctx->schema->GetColumnIndex(primitive_node);
ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type,
GetTypeForNode(column_index, primitive_node, ctx));
if (node.is_repeated()) {
// One-level list encoding, e.g.
// a: repeated int32;
int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated();
out->children.resize(1);
auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false);
RETURN_NOT_OK(PopulateLeaf(column_index, child_field, current_levels, ctx, out,
&out->children[0]));

out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
/*nullable=*/false, FieldIdMetadata(node.field_id()));
out->level_info = current_levels;
// At this point current_levels has consider this list the ancestor so restore
// the actual ancestor.
out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level;
return Status::OK();
} else {
current_levels.Increment(node);
// A normal (required/optional) primitive node
return PopulateLeaf(column_index,
::arrow::field(node.name(), type, node.is_optional(),
FieldIdMetadata(node.field_id())),
current_levels, ctx, parent, out);
}
}
}

这里会走到 ListToSchemaField,然后根据 node 来解析对应的逻辑。

结语

今天这段写的有点乱。这些内容并不像 btr 那么逻辑上复杂,但是还是充满了复杂性:两个系统间的 Schema 转义层也是很复杂的。

当我们审视这套的时候,我们会发现,不仅是 arrow,所有用 Parquet 的系统都会遇到这些问题,这些代码实际上是「通用代码」。并且还有很多兼容性问题。也希望这篇文章能帮读者识破一些拼凑系统的吹嘘。

再次感叹现实世界工程的无限复杂度。