Parquet Part3: read rep and def levels

之前的 Part2 介绍了 Parquet 的 rep-level, def-level 的写入. Parquet 会根据用户输入的 Array,来根据 schema 的嵌套和中间的元数据,给一个 column 的数据构建对应的栈。在运行的时候,这里会找到连续的字段,然后给定对应的 rep-level 和 def-level 来填充数据。

在写入的时候,用户会给定一个 Array,然后让 Parquet 把 array 中的数据序列化成 rep-level, def-level 和值,这些东西会被编码到 Page 中,以 DATA_PAGE_V2 为例:

  1. 会有个 PageHeader,告诉你有没有 rep-level 和 def-level,和对应长度
  2. rep-level 和 def-level 被 rle/bit-packing 编码
  3. Page 可能被编码与压缩,会记录相关的内容

读取的时候,这些东西也会按照 schema 解析,但是需要回过头来注意一下,读取的时候也是一个层次读取,但是需要从一组 rep-level / def-level 来恢复多层的信息了。

我们可以再回顾一下,parquet 在 arrow 项目的路径是:cpp/src/parquet/:

  1. cpp/src/parquet/:目录下有基本的 parquet 的 schema 和实现,包括 Page 的处理、压缩、column 的处理,这里都是符合标准的,它会借用部分 arrow 的库和实现
  2. cpp/src/parquet/arrowarrow 的转义层,能够把 arrow 的东西解析读、写到 parquet 上,这里的 FileReader 等都会包装 cpp/src/parquet 上的数据,然后处理 rep-leveldef-level上一篇博客里面 levels 的构建就是在 arrow 下面的

Arrow 读取的对应链路大概如下:

  1. ParquetFileReader 是最外层的数据,用 ParquetFileReader::Open 等相关的 api 来打开对应的文件。这里可以根据 RowGroup(rgId) 这个函数,创建对应的 RowGroupReader
  2. SerializedFile 实现了 ParquetFileReader::Contents,具体实现了 ParquetFileReader 的对应逻辑,它可以加载 parquet 的 footer,然后解析 Footer。根据解析的 Footer 来加载对应的 RowGroupReader
    1. 在文件 IO 上,这里有一个 ::arrow::io::internal::ReadRangeCache 表示对某个范围的 caching。然后有一组和这个有关的 api 表示是否缓存
    2. ReaderProperties 表示这个 Reader 的配置,比如是否使用 caching 等
    3. FileMetaData 的 API,注意,这个相当于对 parquet 依赖的 thrift 生成的代码的 wrapper。Parquet 不对外暴露 thrift,而是暴露对应的自己的 API Wrapper,怎么说呢,可以说这个抽象比较好,但是性能也确实比较挫(考虑极端场景,只需要一个 count,也得把 Footer 整个解析)
  3. RowGroupReader可以返回 parquet::ColumnReader 和某个 Column 的 PageReader
  4. SerializedRowGroup 实现了 RowGroupReader::Contents,持有某个 RowGroup 的 MetaData
  5. PageReaderSerializedPageReader 这个实现,对外返回对应的解压之后的 Page 对象,这里可以是数据页、字典页面、Index Page。
  6. parquet::ColumnReaderPageReader 上包装了一层记录读取的 API,根据 ReadBatchReadBatchSpaced 之类的 api 来读取对应的数据,然后返回给上层。这里也会读取 Page 上的 Level,来恢复对应的记录
  7. parquet::arrow 模块会调用上文对应的东西,它有一个 ColumnReader,注意这个 ColumnReader 不要和上面 ColumnReader 搞混了,这是用来恢复 Array

读取: Parquet 中的链路

这块读取比写入简单很多,大概是:

  1. 每一列读 rep-level, def-level 自己就能恢复自己这层的数据
  2. 上层找到 children 的 level,然后可以恢复

所有的恢复都经由 ColumnReader,叶子结点恢复实现在 TypedRecordReader 中,LeafReader 创建 TypeRecordReader 来读取。而上层在 ListReaderStructReader 中恢复,恢复的时候需要从子节点中挑选一个 TypedRecordReader 来恢复一些信息. ColumnReader 这个 API 只有叶子结点有。

ColumnReader 的 API 非常简单,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class PARQUET_EXPORT ColumnReader {
public:
virtual ~ColumnReader() = default;

static std::shared_ptr<ColumnReader> Make(
const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

// Returns true if there are still values in this column.
virtual bool HasNext() = 0;

virtual Type::type type() const = 0;

virtual const ColumnDescriptor* descr() const = 0;

// Get the encoding that can be exposed by this reader. If it returns
// dictionary encoding, then ReadBatchWithDictionary can be used to read data.
//
// \note API EXPERIMENTAL
virtual ExposedEncoding GetExposedEncoding() = 0;

protected:
friend class RowGroupReader;
// Set the encoding that can be exposed by this reader.
//
// \note API EXPERIMENTAL
virtual void SetExposedEncoding(ExposedEncoding encoding) = 0;
};

这里有一些别的结构,我们先来整理一下继承关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/// 提供了 ReadBatch, ReadBatchSpaced 和 Skip, 便于用户读取某一列的具体数据.
/// 这个也是一个接口层, 相对 `ColumnReader` 提供了通用的接口, 这里提供了 Batch 读某个类型的接口.
template <typename DType>
class TypedColumnReader : public ColumnReader {};

/// 实现了从某个 decoder 来读 values, 读 rep-level 和 def-levels.
///
/// 这里也有一些列维护的信息, 比如对应的 decoder, 还有一些静态信息, 比如 Column 对应的最大的 def-level 和
/// rep-level, 记做 descr 和 level info.
///
/// 这里是 Column Reader, 所以处理返回对应的是 ColumnChunk, 这里会维护需要处理的 value 数量
/// (ColumnMetaData 会记录 `num_values`, 代表值而非行)
///
/// 这里也处理了 levels (rep-level 和 def-level) 的 decoder, 因为这两个和某个具体的类型是无关的.
template <typename DType>
class ColumnReaderImplBase {
public:
using T = typename DType::c_type;
using DecoderType = TypedDecoder<DType>;
};

/// 具体的 reader 实现, 实现了 ReadBatch, ReadBatchSpaced 和 Skip.
template <typename DType>
class TypedColumnReaderImpl : public TypedColumnReader<DType>,
public ColumnReaderImplBase<DType> {
public:
using T = typename DType::c_type;
};

也就是说,具体实现会在 TypedColumnReaderTypedColumnReaderImpl 中。对外接口有 Skip, ReadBatch,有一个比较复杂的 ReadBatchSpaced 的接口已经 deprecated 了。

我们专注在 ReadBatch 上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// API to read values from a single column. This is a main client facing API.
template <typename DType>
class TypedColumnReader : public ColumnReader {
public:
typedef typename DType::c_type T;

// Read a batch of repetition levels, definition levels, and values from the
// column.
//
// Since null values are not stored in the values, the number of values read
// may be less than the number of repetition and definition levels. With
// nested data this is almost certainly true.
//
// Set def_levels or rep_levels to nullptr if you want to skip reading them.
// This is only safe if you know through some other source that there are no
// undefined values.
//
// To fully exhaust a row group, you must read batches until the number of
// values read reaches the number of stored values according to the metadata.
//
// This API is the same for both V1 and V2 of the DataPage
//
// @returns: actual number of levels read (see values_read for number of values read)
virtual int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
T* values, int64_t* values_read) = 0;
// ...
};

这里,会根据 batch_size 来填充数据,这里分为:

  1. def_levelsrep_levels,两个 i16 数组,代表对应的 levels,可能没有读,这两个是等长的(不等长会抛出 exception)
  2. values,具体的值
  3. values_read, 代表具体从存储读了多少非 null 值。尽量从 rep_levels 和 def_levels 来判断这个值

看看实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
int16_t* rep_levels, T* values,
int64_t* values_read) {
// HasNext invokes ReadNewPage
if (!HasNext()) {
*values_read = 0;
return 0;
}

// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
int64_t num_def_levels = 0;
int64_t values_to_read = 0;
// 读 levels, 然后也判断出哪些是 null, 如果有 null 或者里面消费完了, 具体要读的值可能会小于 `batch_size`.
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read);

// 读取非 null 的 value, 在没有读完这个 Page 的时候, values_read == values.
*values_read = this->ReadValues(values_to_read, values);
int64_t total_values = std::max(num_def_levels, *values_read);
int64_t expected_values =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
if (total_values == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_values);

return total_values;
}

这里会:

  1. 根据 batch_size 和内部的状态,看看读多少值,然后解析对应的 levels。( ReadLevels )
  2. 读取对应的非 null 的 values (ReadValues, 里面会调用 decoder->Decode)
  3. 返回的是读的所有包括 null 的值的数目,values_read 会存放 values 中内容的长度 (ConsumeBufferedValues )

这里我们重申一个比较 hack 的地方,就是变量名称问题,这个也涉及到一些 Decoder 内部的逻辑:

  1. num_buffered_values_: Page 中的 values 的水位
  2. num_decoded_values_: decode 成功吐出去的 values 的水位

你会发现我这里的用词是 Levels 而不是 Values…我们看看之前博客里的 num_values 的逻辑( https://blog.mwish.me/2022/09/18/Parquet-Part1-Basic/ ):

DataPageV2 会有 num_rows,表示对应行的数量。在有嵌套的情况下,这个不等于 num_values.

这里代表的是「包括 nulls,这个 ColumnChunk 对应值的数量」而非「这个 ColumnChunk 对应行的数量」。这个地方我说的可能有一点难理解,比如对于 optional i32 的平坦数据,这里等价于行数,而对于 repeated <optional i32>,这里代表内层的数量。PageHeadernum_values 也同理。

这里会根据 page 来设置对应的值,同时跟 HasNextInternal() 这个接口联合使用,来不停 fetch page。(values, rows/record, levels 这些概念比较混乱,注意区分)。

这个地方解析了对应的东西,然后返回了对应的数值。当然,这里逻辑比较简单。

这个地方可以总结性质的放一张流程:

1
2
3
类型组合:
- ColumnReader(ColumnReaderImpl, TypedColumnReader): 包含数据 decoder, rep-level 和 def-level 的 Decoder, page reader
- RecordReader(注册数据的 buffer, rep-level buffer, def-level buffer, page buffer)

TypedRecordReader: 组织成输出行而不是输出 Value

TypedRecordReader 提供了一组复杂一点的函数:ReadRecordsReadRecordData,这个就相当于「读上层的几行」了。这个是给 parquet::arrow 准备的结构。

这套相当于把 Values 的读取交给 ColumnReaderImpl 层,然后在上面封装了 Row 的 Read。它相当于一个特种类型的记录组织器,能表示具体处理的「行」,而 ColumnReader 处理的内容是 “values”.

这部分相当于

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
namespace internal {

/// \brief Stateful column reader that delimits semantic records for both flat
/// and nested columns
///
/// \note API EXPERIMENTAL
/// \since 1.3.0
class RecordReader {
public:
static std::shared_ptr<RecordReader> Make(
const ColumnDescriptor* descr, LevelInfo leaf_info,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
const bool read_dictionary = false);

virtual ~RecordReader() = default;

/// \brief Attempt to read indicated number of records from column chunk
/// \return number of records read
virtual int64_t ReadRecords(int64_t num_records) = 0;

/// \brief Pre-allocate space for data. Results in better flat read performance
virtual void Reserve(int64_t num_values) = 0;

/// \brief Clear consumed values and repetition/definition levels as the
/// result of calling ReadRecords
virtual void Reset() = 0;

/// \brief Transfer filled values buffer to caller. A new one will be
/// allocated in subsequent ReadRecords calls
virtual std::shared_ptr<ResizableBuffer> ReleaseValues() = 0;

/// \brief Transfer filled validity bitmap buffer to caller. A new one will
/// be allocated in subsequent ReadRecords calls
virtual std::shared_ptr<ResizableBuffer> ReleaseIsValid() = 0;

/// \brief Return true if the record reader has more internal data yet to
/// process
virtual bool HasMoreData() const = 0;

/// \brief Advance record reader to the next row group
/// \param[in] reader obtained from RowGroupReader::GetColumnPageReader
virtual void SetPageReader(std::unique_ptr<PageReader> reader) = 0;

virtual void DebugPrintState() = 0;

/// \brief Decoded definition levels
int16_t* def_levels() const {
return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
}

/// \brief Decoded repetition levels
int16_t* rep_levels() const {
return reinterpret_cast<int16_t*>(rep_levels_->mutable_data());
}

/// \brief Decoded values, including nulls, if any
uint8_t* values() const { return values_->mutable_data(); }

/// \brief Number of values written including nulls (if any)
int64_t values_written() const { return values_written_; }

/// \brief Number of definition / repetition levels (from those that have
/// been decoded) that have been consumed inside the reader.
int64_t levels_position() const { return levels_position_; }

/// \brief Number of definition / repetition levels that have been written
/// internally in the reader
int64_t levels_written() const { return levels_written_; }

/// \brief Number of nulls in the leaf
int64_t null_count() const { return null_count_; }

/// \brief True if the leaf values are nullable
bool nullable_values() const { return nullable_values_; }

/// \brief True if reading directly as Arrow dictionary-encoded
bool read_dictionary() const { return read_dictionary_; }

protected:
bool nullable_values_;

// 内部状态, 表示现在的记录是否是在某行的行首, 用来推进 records_read
bool at_record_start_;
// 读的行数的计数
int64_t records_read_;

// values 状态的记录器.
// 这部分也需要处理
int64_t values_written_;
int64_t values_capacity_;
int64_t null_count_;

// 下面这里提供了一个 buffer

// 解析出的 levels
int64_t levels_written_;
// 行 Parsing 完的 records
int64_t levels_position_;
int64_t levels_capacity_;

// 变长记录的 value buffer
std::shared_ptr<::arrow::ResizableBuffer> values_;

// In the case of false, don't allocate the values buffer (when we directly read into
// builder classes).
bool uses_values_;

std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
std::shared_ptr<::arrow::ResizableBuffer> def_levels_;
std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;

bool read_dictionary_ = false;
};

可以看到,这里存放了各种内容,能切换 Page 然后解析道本地的 values_ 上,是一个有状态的读者。

TypedRecordReader::ReadRecord 读出具体的数据,这个地方会根据两个 level 选择从底层读多少行。最重要的函数在 ReadRecordData,这个会把对应的逻辑编组成 array,然后设置 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Return number of logical records read
//
// 读到 level 之后, 来读具体数据惹
int64_t ReadRecordData(int64_t num_records) {
// Conservative upper bound
const int64_t possible_num_values =
std::max(num_records, levels_written_ - levels_position_);
ReserveValues(possible_num_values);

const int64_t start_levels_position = levels_position_;

int64_t values_to_read = 0;
int64_t records_read = 0;
if (this->max_rep_level_ > 0) {
// 有重复或者空的,走 delimit
records_read = DelimitRecords(num_records, &values_to_read);
} else if (this->max_def_level_ > 0) {
// 没有重复,只有可空。那么 written - position 就是数目.

// No repetition levels, skip delimiting logic. Each level represents a
// null or not null entry
records_read = std::min(levels_written_ - levels_position_, num_records);

// This is advanced by DelimitRecords, which we skipped
levels_position_ += records_read;
} else {
records_read = values_to_read = num_records;
}

// 具体读值的逻辑
int64_t null_count = 0;
if (leaf_info_.HasNullableValues()) {
ValidityBitmapInputOutput validity_io;
validity_io.values_read_upper_bound = levels_position_ - start_levels_position;
validity_io.valid_bits = valid_bits_->mutable_data();
validity_io.valid_bits_offset = values_written_;

// def level 拆分
DefLevelsToBitmap(def_levels() + start_levels_position,
/* level 的数量 */ levels_position_ - start_levels_position, leaf_info_,
&validity_io);
values_to_read = validity_io.values_read - validity_io.null_count;
null_count = validity_io.null_count;
DCHECK_GE(values_to_read, 0);
ReadValuesSpaced(validity_io.values_read, null_count);
} else {
DCHECK_GE(values_to_read, 0);
ReadValuesDense(values_to_read);
}
if (this->leaf_info_.def_level > 0) {
// Optional, repeated, or some mix thereof
this->ConsumeBufferedValues(levels_position_ - start_levels_position);
} else {
// Flat, non-repeated
this->ConsumeBufferedValues(values_to_read);
}
// Total values, including null spaces, if any
values_written_ += values_to_read + null_count;
null_count_ += null_count;

return records_read;
}

这里还是比较复杂的,本身是个多继承,TypedRecordReader 继承了 ColumnReaderImplRecordReader,这两个都是有状态的,状态又贼多,我觉得设计的让我看了头痛。但上面这段代码其实挺好懂的:

计算 records_read (读的行) 和 values_to_read 需要读的 rep-leveldef-level 与值

  1. max_def_level_max_rep_level_ 都等于 0 的时候,schema 是平坦的,且没有值为 null,所以读的 行和记录数量都是固定的,也没有 rep-level 和 def-level
  2. max_def_level_ != 0max_rep_level_ == 0 的时候,schema 是平坦的,但是有的值可能为 null,这里读取 num-record 条 def level 就行了, 可以解析这些行解析出 null,但不用处理重复的 rep/def.
  3. 否则,走 DelimitRecords,去解析行的数量. 这段应该是整个 Parquet 读流程最 hack 的一部分代码了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

// Process written repetition/definition levels to reach the end of
// records. Process no more levels than necessary to delimit the indicated
// number of logical records. Updates internal state of RecordReader
//
// 这个函数只会发生在 rep-level 和 def-level 都在的情况下. 这里会根据内部状态 `at_record_start_`
// 判断是否在一个行的开头. 目标是不越过 `levels_written_` 的情况下读 `num_records` 行.
// 返回读的行数和 values
//
// \return Number of records delimited
int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
int64_t values_to_read = 0;
int64_t records_read = 0;

const int16_t* def_levels = this->def_levels() + levels_position_;
const int16_t* rep_levels = this->rep_levels() + levels_position_;

DCHECK_GT(this->max_rep_level_, 0);

// Count logical records and number of values to read
// 读 level_position 来判断内容
// 这里的逻辑会先读 rep_level, 来查一下行的变更, rep_level == 0 一定切了行, 读完了就会返回.
// 然后会检查 `def_level`, 看这个是否是 null. 不是 null 就会添加 `values_to_read`.
// **非 0 的 rep-level 不会被这一层处理**
while (levels_position_ < levels_written_) {
const int16_t rep_level = *rep_levels++;
// rep_level == 0 的时候, 一定是整行的开头.
if (rep_level == 0) {
// If at_record_start_ is true, we are seeing the start of a record
// for the second time, such as after repeated calls to
// DelimitRecords. In this case we must continue until we find
// another record start or exhausting the ColumnChunk
//
// 上一次走完了这行, 就会 `!at_record_start_`.
if (!at_record_start_) {
// We've reached the end of a record; increment the record count.
++records_read;
if (records_read == num_records) {
// We've found the number of records we were looking for. Set
// at_record_start_ to true and break
at_record_start_ = true;
break;
}
}
}
// We have decided to consume the level at this position; therefore we
// must advance until we find another record boundary
at_record_start_ = false;

const int16_t def_level = *def_levels++;
if (def_level == this->max_def_level_) {
++values_to_read;
}
++levels_position_;
}
*values_seen = values_to_read;
return records_read;
}

读完之后,这里会具体把值读到 buffer 中。如果叶子结点非 null,那就直接 ReadValuesDense 了,符合逻辑,否则的话,这里会:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 具体读值的逻辑
int64_t null_count = 0;
if (leaf_info_.HasNullableValues()) {
ValidityBitmapInputOutput validity_io;
validity_io.values_read_upper_bound = levels_position_ - start_levels_position;
validity_io.valid_bits = valid_bits_->mutable_data(); // 指向 `valid_bits_` 内部数组.
validity_io.valid_bits_offset = values_written_;

// def level 拆分
DefLevelsToBitmap(def_levels() + start_levels_position,
/* level 的数量 */ levels_position_ - start_levels_position, leaf_info_,
&validity_io);
values_to_read = validity_io.values_read - validity_io.null_count;
null_count = validity_io.null_count;
DCHECK_GE(values_to_read, 0);
ReadValuesSpaced(validity_io.values_read, null_count);
}

这里的逻辑感觉写的…有点怪,它会根据 values_read_upper_boundvalid_bits_offset 来填充 valid_bitsnull_countvalues_read,这里应该相当于一个内部检查,毕竟 values_read 前面只有 rep_level == 0, def_level != 0 的时候没算过😅。这里会给出 Level,把对应的 count 用 simd 解析完,然后算到 validity_io 里面,然后用 ReadValuesSpaced 读取对应的数据。这一块状态传递非常乱,建议不改这块代码意会一下就好了。这块作者主要是 Micah Cornfield 和 Wesm,代码性能确实不错,但是写的好暴力啊。

总之,这里会拿到行数、null_countnull-bitmap,然后完成读取。

这里还有个比较 hack 的地方,就是 levels_position_ 的处理,这段代码比较 hacking,它会:

  1. 在底层的时候,就知道有多少行数据,record reader 解析出一个 record 数目
  2. 上层根据 Def-Rep 解析出 List / Map 的 size 和 nullable,恢复出上层的数据

所以,解析的时候,有2个 levels:

  1. levels position: 目前读的行的 levels
  2. levels written: 从 page 中捞出来的 levels

parquet/arrow: 从 arrow 恢复 Array 数据

上面我们拿到了 api 和从 Column 中读数据的逻辑,下面这里需要从 Parquet 拿到的连续数据中恢复 arrow 的数据。对应的实现在 parquet/arrow/reader 中。再次提醒一下,这里 namespaceparquet::arrow,负责 arrow 一些复杂格式和 parquet 互转。

ColumnReader

头文件中,有一个 ColumnReader,作为读取接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// At this point, the column reader is a stream iterator. It only knows how to
// read the next batch of values for a particular column from the file until it
// runs out.
//
// We also do not expose any internal Parquet details, such as row groups. This
// might change in the future.
class PARQUET_EXPORT ColumnReader {
public:
virtual ~ColumnReader() = default;

// Scan the next array of the indicated size. The actual size of the
// returned array may be less than the passed size depending how much data is
// available in the file.
//
// When all the data in the file has been exhausted, the result is set to
// nullptr.
//
// Returns Status::OK on a successful read, including if you have exhausted
// the data available in the file.
virtual ::arrow::Status NextBatch(int64_t batch_size,
std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
};

class ColumnReaderImpl : public ColumnReader {
public:
virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
virtual const std::shared_ptr<Field> field() = 0;

::arrow::Status NextBatch(int64_t batch_size,
std::shared_ptr<::arrow::ChunkedArray>* out) final {
RETURN_NOT_OK(LoadBatch(batch_size));
RETURN_NOT_OK(BuildArray(batch_size, out));
for (int x = 0; x < (*out)->num_chunks(); x++) {
RETURN_NOT_OK((*out)->chunk(x)->Validate());
}
return Status::OK();
}

virtual ::arrow::Status LoadBatch(int64_t num_records) = 0;

virtual ::arrow::Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
virtual bool IsOrHasRepeatedChild() const = 0;
};

这里内容分为两部分:

  1. LoadBatch: 利用之前讲的 RecordReader 来把一个 Batch 的行加载到 Values 的 Buffer 中

  2. BuildArray: 构建内存 array,可能会从子节点构建父节点

这里其实还可能要转型,为什么要转型呢?parquet 基础类型其实没几个,比方说 int8,它也会用更大的 int 去压缩。所以这里要转型到内存的格式中

LeafReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Leaf reader is for primitive arrays and primitive children of nested arrays
class LeafReader : public ColumnReaderImpl {
public:
Status GetDefLevels(const int16_t** data, int64_t* length) final {
*data = record_reader_->def_levels();
*length = record_reader_->levels_position();
return Status::OK();
}

Status GetRepLevels(const int16_t** data, int64_t* length) final {
*data = record_reader_->rep_levels();
*length = record_reader_->levels_position();
return Status::OK();
}

bool IsOrHasRepeatedChild() const final { return false; }

Status LoadBatch(int64_t records_to_read) final;

::arrow::Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<::arrow::ChunkedArray>* out) final {
*out = out_;
return Status::OK();
}
private:
std::shared_ptr<ChunkedArray> out_;
void NextRowGroup() {
std::unique_ptr<PageReader> page_reader = input_->NextChunk();
record_reader_->SetPageReader(std::move(page_reader));
}

std::shared_ptr<ReaderContext> ctx_;
std::shared_ptr<Field> field_;
std::unique_ptr<FileColumnIterator> input_;
const ColumnDescriptor* descr_;
std::shared_ptr<RecordReader> record_reader_;
};
};

这里我把 LoadBatch 实现摘出来了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Status LoadBatch(int64_t records_to_read) final {
BEGIN_PARQUET_CATCH_EXCEPTIONS
out_ = nullptr;
record_reader_->Reset();
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
while (records_to_read > 0) {
if (!record_reader_->HasMoreData()) {
break;
}
int64_t records_read = record_reader_->ReadRecords(records_to_read);
records_to_read -= records_read;
if (records_read == 0) {
NextRowGroup();
}
}
RETURN_NOT_OK(
TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
  1. 这里会从 record_reader_ 里面所要数据,然后切 Page 或者 row_group,直到读完或者满足 batch
  2. 利用 TransferColumnData 做转型,然后 BuildArray 没有任何开销

它难一点的内部实现都在 RecordReader 了,哎,是真的又恶心又难呢…

List / Struct

这个会找到一个靠谱的 child,然后从这里面拉 rep-level 和 def-level,来恢复对应的记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
public:
explicit StructReader(std::shared_ptr<ReaderContext> ctx,
std::shared_ptr<Field> filtered_field,
::parquet::internal::LevelInfo level_info,
std::vector<std::unique_ptr<ColumnReaderImpl>> children)
: ctx_(std::move(ctx)),
filtered_field_(std::move(filtered_field)),
level_info_(level_info),
children_(std::move(children)) {
// 找到靠谱的记录,来设置一个 `def_rep_level_child_`, 即有 level 的 child.
...
}

Status LoadBatch(int64_t records_to_read) override {
// 递归让子节点 build batch
for (const std::unique_ptr<ColumnReaderImpl>& reader : children_) {
RETURN_NOT_OK(reader->LoadBatch(records_to_read));
}
return Status::OK();
}
Status BuildArray(int64_t length_upper_bound,
std::shared_ptr<ChunkedArray>* out) override;
Status GetDefLevels(const int16_t** data, int64_t* length) override;
Status GetRepLevels(const int16_t** data, int64_t* length) override;
const std::shared_ptr<Field> field() override { return filtered_field_; }

private:
const std::shared_ptr<ReaderContext> ctx_;
const std::shared_ptr<Field> filtered_field_;
const ::parquet::internal::LevelInfo level_info_;
const std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
ColumnReaderImpl* def_rep_level_child_ = nullptr;
bool has_repeated_child_;
};

那么,我们看 LoadBatch 就 Load 了所有子节点,所以这里重点在 BuildArray 上,这里用到了我们之前看到的 ValidityBitmapInputOutput (我之前吐槽过的,在 RecordReader 那块代码那)。这里搞的到一个叶子结点的 rep-level 和 def-level,然后根据这一个叶子结点的 levels 就能恢复所有的信息,这里有一些解析函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 只考虑 def-level, 用于叶子结点.
void DefLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
LevelInfo level_info, ValidityBitmapInputOutput* output) {
// It is simpler to rely on rep_level here until PARQUET-1899 is done and the code
// is deleted in a follow-up release.
if (level_info.rep_level > 0) {
#if defined(ARROW_HAVE_RUNTIME_BMI2)
if (CpuInfo::GetInstance()->HasEfficientBmi2()) {
return DefLevelsToBitmapBmi2WithRepeatedParent(def_levels, num_def_levels,
level_info, output);
}
#endif
standard::DefLevelsToBitmapSimd</*has_repeated_parent=*/true>(
def_levels, num_def_levels, level_info, output);
} else {
standard::DefLevelsToBitmapSimd</*has_repeated_parent=*/false>(
def_levels, num_def_levels, level_info, output);
}
}

void DefRepLevelsToList(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_def_levels, LevelInfo level_info,
ValidityBitmapInputOutput* output, int32_t* offsets) {
DefRepLevelsToListInfo<int32_t>(def_levels, rep_levels, num_def_levels, level_info,
output, offsets);
}

void DefRepLevelsToList(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_def_levels, LevelInfo level_info,
ValidityBitmapInputOutput* output, int64_t* offsets) {
DefRepLevelsToListInfo<int64_t>(def_levels, rep_levels, num_def_levels, level_info,
output, offsets);
}

// 根据 def-rep 来算, 拿到叶子结点的 level 推断父亲的列表.
void DefRepLevelsToBitmap(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_def_levels, LevelInfo level_info,
ValidityBitmapInputOutput* output) {
// DefReplevelsToListInfo assumes it for the actual list method and this
// method is for parent structs, so we need to bump def and ref level.
level_info.rep_level += 1;
level_info.def_level += 1;
DefRepLevelsToListInfo<int32_t>(def_levels, rep_levels, num_def_levels, level_info,
output, /*offsets=*/nullptr);
}

offsets 参数是给 List 这样内容准备的。不考虑 offset 情况下,struct 对应逻辑:

  1. rep_levels 比自身大的时候,说明是子节点在重复,skip
  2. 如果 rep-level 等于自身,说明在自己这层重复(对结构体是不可能的,所以不考虑)
  3. 如果 rep-level 小自身,说明父级重复。这里可以根据 def-level 来判断自己是否是 null

我们简单看看,下面的代码删掉了 offsets 和 Struct 不会 touch 的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
template <typename OffsetType>
void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_def_levels, LevelInfo level_info,
ValidityBitmapInputOutput* output, OffsetType* offsets) {
OffsetType* orig_pos = offsets;
optional<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
if (output->valid_bits) {
valid_bits_writer.emplace(output->valid_bits, output->valid_bits_offset,
output->values_read_upper_bound);
}
for (int x = 0; x < num_def_levels; x++) {
// Skip items that belong to empty or null ancestor lists and further nested lists.
if (def_levels[x] < level_info.repeated_ancestor_def_level ||
rep_levels[x] > level_info.rep_level) {
continue;
}

if (ARROW_PREDICT_FALSE(
(valid_bits_writer.has_value() &&
valid_bits_writer->position() >= output->values_read_upper_bound) ||
(offsets - orig_pos) >= output->values_read_upper_bound)) {
std::stringstream ss;
ss << "Definition levels exceeded upper bound: "
<< output->values_read_upper_bound;
throw ParquetException(ss.str());
}

if (valid_bits_writer.has_value()) {
// the level_info def level for lists reflects element present level.
// the prior level distinguishes between empty lists.
if (def_levels[x] >= level_info.def_level - 1) {
valid_bits_writer->Set();
} else {
output->null_count++;
valid_bits_writer->Clear();
}
valid_bits_writer->Next();
}

}
if (valid_bits_writer.has_value()) {
valid_bits_writer->Finish();
}
if (offsets != nullptr) {
output->values_read = offsets - orig_pos;
} else if (valid_bits_writer.has_value()) {
output->values_read = valid_bits_writer->position();
}
if (output->null_count > 0 && level_info.null_slot_usage > 1) {
throw ParquetException(
"Null values with null_slot_usage > 1 not supported."
"(i.e. FixedSizeLists with null values are not supported)");
}
}

那对于 List 来说,我们会需要 offsets,来确定它没每段偏移量是多少。而且 rep-level 可能等于自身,我们再来看看这个函数的完整版(一个函数贴两遍,你水字数是真的牛逼):

这里注意:

  1. rep_levels 比自身大的时候,说明是子节点在重复,skip
  2. 如果 rep-level 等于自身,说明在自己这层重复,给 list 添加一个元素,*offset += 1
  3. 如果 rep-level 小自身,说明父级重复。这里向前移动 offsets,可以根据 def-level 来判断自己是否是 null。如果自己不是 null,说明有个新成员,得添加下 offsets
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
template <typename OffsetType>
void DefRepLevelsToListInfo(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_def_levels, LevelInfo level_info,
ValidityBitmapInputOutput* output, OffsetType* offsets) {
OffsetType* orig_pos = offsets;
optional<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer;
if (output->valid_bits) {
valid_bits_writer.emplace(output->valid_bits, output->valid_bits_offset,
output->values_read_upper_bound);
}
for (int x = 0; x < num_def_levels; x++) {
// Skip items that belong to empty or null ancestor lists and further nested lists.
if (def_levels[x] < level_info.repeated_ancestor_def_level ||
rep_levels[x] > level_info.rep_level) {
continue;
}

if (rep_levels[x] == level_info.rep_level) {
// A continuation of an existing list.
// offsets can be null for structs with repeated children (we don't need to know
// offsets until we get to the children).
if (offsets != nullptr) {
if (ARROW_PREDICT_FALSE(*offsets == std::numeric_limits<OffsetType>::max())) {
throw ParquetException("List index overflow.");
}
*offsets += 1;
}
} else {
if (ARROW_PREDICT_FALSE(
(valid_bits_writer.has_value() &&
valid_bits_writer->position() >= output->values_read_upper_bound) ||
(offsets - orig_pos) >= output->values_read_upper_bound)) {
std::stringstream ss;
ss << "Definition levels exceeded upper bound: "
<< output->values_read_upper_bound;
throw ParquetException(ss.str());
}

// current_rep < list rep_level i.e. start of a list (ancestor empty lists are
// filtered out above).
// offsets can be null for structs with repeated children (we don't need to know
// offsets until we get to the children).
if (offsets != nullptr) {
++offsets;
// Use cumulative offsets because variable size lists are more common then
// fixed size lists so it should be cheaper to make these cumulative and
// subtract when validating fixed size lists.
*offsets = *(offsets - 1);
if (def_levels[x] >= level_info.def_level) {
if (ARROW_PREDICT_FALSE(*offsets == std::numeric_limits<OffsetType>::max())) {
throw ParquetException("List index overflow.");
}
*offsets += 1;
}
}

if (valid_bits_writer.has_value()) {
// the level_info def level for lists reflects element present level.
// the prior level distinguishes between empty lists.
if (def_levels[x] >= level_info.def_level - 1) {
valid_bits_writer->Set();
} else {
output->null_count++;
valid_bits_writer->Clear();
}
valid_bits_writer->Next();
}
}
}
if (valid_bits_writer.has_value()) {
valid_bits_writer->Finish();
}
if (offsets != nullptr) {
output->values_read = offsets - orig_pos;
} else if (valid_bits_writer.has_value()) {
output->values_read = valid_bits_writer->position();
}
if (output->null_count > 0 && level_info.null_slot_usage > 1) {
throw ParquetException(
"Null values with null_slot_usage > 1 not supported."
"(i.e. FixedSizeLists with null values are not supported)");
}
}

总结

这篇文章继续讲了 parquet 的 C++ 标准库怎么读取内容的。简单介绍了一下 文件 — Column 的读取流。Column 下面的编码这篇文章并没有涵盖。如果看不懂的话…感觉也很正常,因为感觉这块代码本身写的质量就…一般般(主要是感觉抽象程度太低了,而且抽的很令人困惑)?不过能把这套东西做出来还是蛮牛逼的,老实说我看着就头疼。这文章看懂能明白大概流程我觉得就很不错了,我也是半当笔记自己写的。