Parquet C++: Statistics

上面一节我们讨论了 arrow 类型到 Parquet 类型的外层类型映射,我们学到的技巧是,用 arrow 的 Schema 转换成 Parquet 的 Schema,做类型映射,然后冗余一份 Parquet 的 Schema。这一节介绍的是 Types and Statistics In Parquet。这节的内容也比较 Dirty,但是还是必要的。此外,这节会涉及一些 Parquet MR 的代码,因为它的代码是「好 / 对」的。arrow Schema 会有很多问题,我也会尝试修复这些问题。

此外,这节只涉及标准的、协议上的 Statistics,如果用户想打洞,这篇文章也可以提供一些 arrow 的外挂。

Parquet Standard

Statistics

Parquet 协议上本身会带有一些 Statistics,分成多部分:

Statistics 结构(我在下面删掉了兼容性对应的部分,但是实际代码是要处理的)

  1. 这个地方的 max_valuemin_value 都是 Parquet 内部的编码。根据 ColumnOrder 决定,什么是 ColumnOrder 呢?之后介绍
  2. null_count 是个非常奇怪的东西,需要注意的是 这个地方是 Parquet 内部,而不是用户层面的语义。意思就是,在 max-rep-level > 1 的场景下,你用 max-rep-level 就会有奇怪的问题了
  3. distinct_count 稍微好理解一点,但需要注意这个对 null 的包含的情况
  4. 这里所有字段都是 optional 的,Statistics 本身也是 Optional 的

请看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Statistics per row group and per page
* All fields are optional.
*/
struct Statistics {
/** count of null value in the column */
3: optional i64 null_count;
/** count of distinct values occurring */
4: optional i64 distinct_count;
/**
* Min and max values for the column, determined by its ColumnOrder.
*
* Values are encoded using PLAIN encoding, except that variable-length byte
* arrays do not include a length prefix.
*/
5: optional binary max_value;
6: optional binary min_value;
}

那么,这里会有两个地方持有 Statistics: Page Header 和 ColumnChunkMetadata:

1
2
3
4
5
6
/** Data page header (这里是 data page v1 的 header, v2 也有, 懒得贴了, 不过 dict 没有, 只有 num_values) */
struct DataPageHeader {

/** Optional statistics for the data in this page**/
5: optional Statistics statistics;
}

和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Description for column metadata
*/
struct ColumnMetaData {
/** Type of this column **/
1: required Type type

/** Set of all encodings used for this column. The purpose is to validate
* whether we can decode those pages. **/
2: required list<Encoding> encodings

/** Path in schema **/
3: required list<string> path_in_schema

/** optional statistics for this column chunk */
12: optional Statistics statistics;

/** Set of all encodings used for pages in this column chunk.
* This information can be used to determine if all data pages are
* dictionary encoded for example **/
13: optional list<PageEncodingStats> encoding_stats;
}

你可能会发现,我多留了个 encoding_stats,这是什么呢?别急后面会讲。还记得之前说的吗?这些 Statistics 本身会受 Schema 控制,这个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Description for file metadata
*/
struct FileMetaData {
/** Parquet schema for this file. This schema contains metadata for all the columns.
* The schema is represented as a tree with a single root. The nodes of the tree
* are flattened to a list by doing a depth-first traversal.
* The column metadata contains the path in the schema for that column which can be
* used to map columns to nodes in the schema.
* The first element is the root **/
2: required list<SchemaElement> schema;

/** Optional key/value metadata **/
5: optional list<KeyValue> key_value_metadata

/** String for application that wrote this file. This should be in the format
* <Application> version <App Version> (build <App Build Hash>).
* e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55)
**/
6: optional string created_by

/**
* Sort order used for the min_value and max_value fields in the Statistics
* objects and the min_values and max_values fields in the ColumnIndex
* objects of each column in this file. Sort orders are listed in the order
* matching the columns in the schema. The indexes are not necessary the same
* though, because only leaf nodes of the schema are represented in the list
* of sort orders.
*
* Without column_orders, the meaning of the min_value and max_value fields
* in the Statistics object and the ColumnIndex object is undefined. To ensure
* well-defined behaviour, if these fields are written to a Parquet file,
* column_orders must be written as well.
*
* The obsolete min and max fields in the Statistics object are always sorted
* by signed comparison regardless of column_orders.
*/
7: optional list<ColumnOrder> column_orders;
}

这里有个 ColumnOrder,正是对 Statistics 对应所有逻辑列的统计的总描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/** Empty struct to signal the order defined by the physical or logical type */
struct TypeDefinedOrder {}

/**
* Union to specify the order used for the min_value and max_value fields for a
* column. This union takes the role of an enhanced enum that allows rich
* elements (which will be needed for a collation-based ordering in the future).
*
* Possible values are:
* * TypeDefinedOrder - the column uses the order defined by its logical or
* physical type (if there is no logical type).
*
* If the reader does not support the value of this union, min and max stats
* for this column should be ignored.
*/
union ColumnOrder {

/**
* The sort orders for logical types are:
* UTF8 - unsigned byte-wise comparison
* INT8 - signed comparison
* INT16 - signed comparison
* INT32 - signed comparison
* INT64 - signed comparison
* UINT8 - unsigned comparison
* UINT16 - unsigned comparison
* UINT32 - unsigned comparison
* UINT64 - unsigned comparison
* DECIMAL - signed comparison of the represented value
* DATE - signed comparison
* TIME_MILLIS - signed comparison
* TIME_MICROS - signed comparison
* TIMESTAMP_MILLIS - signed comparison
* TIMESTAMP_MICROS - signed comparison
* INTERVAL - unsigned comparison
* JSON - unsigned byte-wise comparison
* BSON - unsigned byte-wise comparison
* ENUM - unsigned byte-wise comparison
* LIST - undefined
* MAP - undefined
*
* In the absence of logical types, the sort order is determined by the physical type:
* BOOLEAN - false, true
* INT32 - signed comparison
* INT64 - signed comparison
* INT96 (only used for legacy timestamps) - undefined
* FLOAT - signed comparison of the represented value (*)
* DOUBLE - signed comparison of the represented value (*)
* BYTE_ARRAY - unsigned byte-wise comparison
* FIXED_LEN_BYTE_ARRAY - unsigned byte-wise comparison
*
* (*) Because the sorting order is not specified properly for floating
* point values (relations vs. total ordering) the following
* compatibility rules should be applied when reading statistics:
* - If the min is a NaN, it should be ignored.
* - If the max is a NaN, it should be ignored.
* - If the min is +0, the row group may contain -0 values as well.
* - If the max is -0, the row group may contain +0 values as well.
* - When looking for NaN values, min and max should be ignored.
*
* When writing statistics the following rules should be followed:
* - NaNs should not be written to min or max statistics fields.
* - If the computed max value is zero (whether negative or positive),
* `+0.0` should be written into the max statistics field.
* - If the computed min value is zero (whether negative or positive),
* `-0.0` should be written into the min statistics field.
*/
1: TypeDefinedOrder TYPE_ORDER;
}

这块内容比较大,我们可以看作 标准目前只实现了 TypeDefinedOrder,可以理解成,会以下面的 def 来挑选 order:

  • LogicalType (或者 ConvertedType) 的 Order
  • Physical 的 Order (In the absence of logical types, the sort order is determined by the physical type)
  • 对于 float,这里特殊处理了 NaN 的 Order,这里我这几天正好碰到一个类似的 issue:

我们可以做出如下总结

  1. FileMetadata 上有 Schema,同时也有 column_orders
    1. Order 按照 LogicalType, Type 来定义
  2. ColumnChunkMetadataPage 上都有对应的 Statistics,Statistics 本身的 Order 遵循 FileMetadata. Statistics 本身和上面的内容都是 optional 的

Statistic: Dictionary & Corner Cases

你肯定觉得之前已经讲过 Statistics 了,但是之前其实还确实了一部分。我们知道 Page 有 min-max,那么:

  1. 字典页面的 Min-Max 是怎么处理的呢?
  2. 字典页面的 Layout 是什么样子的呢?
  3. Null 之类的,会不会进 ndv 呢?

我们可以介绍一些 Parquet 字典格式的黑暗部分了。这里可以先简单翻阅一下 Parquet 的 Encoding 部分:https://github.com/apache/parquet-format/blob/master/Encodings.md#dictionary-encoding-plain_dictionary--2-and-rle_dictionary--8

这里会发现,字典有编码 PLAIN_DICTIONARYRLE_DICTIONARY.

  • PLAIN_DICTIONARY: 所有页面都是 PLAIN_DICTIONARY 标头,在 format-1.0 中使用
  • RLE_DICTIONARY: 字典(index)页是 PLAIN, Data 页面是 RLE_DICTIONARY

实际上,我们可以看下 Parquet 的 Properties:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
inline ParquetVersion::type version() const { return parquet_version_; }

inline std::string created_by() const { return parquet_created_by_; }

inline bool page_checksum_enabled() const { return page_checksum_enabled_; }

inline Encoding::type dictionary_index_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
} else {
return Encoding::RLE_DICTIONARY;
}
}

inline Encoding::type dictionary_page_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
} else {
return Encoding::PLAIN;
}
}

那么字典的 Statistics 是什么样子的呢?这部分我们可以在实现里学习,不过,在标准中,我们可以直接论断:

  • 字典数据页的 Statistics 和非字典数据页的 Statistics 逻辑上是一样的,此外,因为借用了字典,它能更好的去收集 ndv
  • 字典索引页根本没有 Statistics

那么 NaN 呢?实际上 NaN 并非 null,也是一种(根据 IEEE 754,可能是两种)values。

实现

我们之前提到了,Parquet Spec 有 Physical 的 type (Type) 和 LogicalType (LogicalType 和兼容 parquet-format-v1 的 ConvertedType)。这个 Type 会有对应的 ColumnOrder。接下来我们要把这坨代码映射到 C++ 上。这部分在 Parquet C++ 实现上还是比较残缺的,我们也会尽量比对 Java 版本的实现。

实现分为两个部分: 读,写

C++ 实现: 写

这里分为好几层:

  1. Encoding 层之上:怎么支持 Statistics
  2. Page 的 Statistics
  3. ColumnChunk 层的 Statistics
  4. 元信息收集

首先,我们从 Comparator 类型讲起,讲讲他为啥好 / 不好.

Comparator 类型的构成:

1
2
3
4
Comparator (创建的工厂)
- TypedComparator (Compare, GetMinMax, GetMinMaxSpaced)
- TypedComparatorImpl
+ ComparatorHelper<Type, bool is_signed>

这块东西相对比较简单,我们可以关注一下 signed 和 unsigned 的实现。他们由 SortOrder 指定。SortOrder 是上层的概念,我们之后会介绍。我们先介绍整数比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <typename DType>
struct UnsignedCompareHelperBase {
using T = typename DType::c_type;
using UCType = typename std::make_unsigned<T>::type;

static_assert(!std::is_same<T, UCType>::value, "T is unsigned");
static_assert(sizeof(T) == sizeof(UCType), "T and UCType not the same size");

// NOTE: according to the C++ spec, unsigned-to-signed conversion is
// implementation-defined if the original value does not fit in the signed type
// (i.e., two's complement cannot be assumed even on mainstream machines,
// because the compiler may decide otherwise). Hence the use of `SafeCopy`
// below for deterministic bit-casting.
// (see "Integer conversions" in
// https://en.cppreference.com/w/cpp/language/implicit_conversion)

static const T DefaultMin() { return SafeCopy<T>(std::numeric_limits<UCType>::max()); }
static const T DefaultMax() { return 0; }

static bool Compare(int type_length, T a, T b) {
return SafeCopy<UCType>(a) < SafeCopy<UCType>(b);
}

static T Min(int type_length, T a, T b) { return Compare(type_length, a, b) ? a : b; }
static T Max(int type_length, T a, T b) { return Compare(type_length, a, b) ? b : a; }
};

因为 DType::c_type 对应是 signed, 所以这里操作了一套 unsigned 来处理。

那么,还有个比较 trickey 的地方是 INT96 的比较,不过 INT96 已经 deprecated 了,我就偷懒了。重点介绍一下浮点数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
template <typename DType, bool is_signed>
struct CompareHelper {
using T = typename DType::c_type;

static_assert(!std::is_unsigned<T>::value || std::is_same<T, bool>::value,
"T is an unsigned numeric");

constexpr static T DefaultMin() { return std::numeric_limits<T>::max(); }
constexpr static T DefaultMax() { return std::numeric_limits<T>::lowest(); }

// MSVC17 fix, isnan is not overloaded for IntegralType as per C++11
// standard requirements.
template <typename T1 = T>
static ::arrow::enable_if_t<std::is_floating_point<T1>::value, T> Coalesce(T val,
T fallback) {
return std::isnan(val) ? fallback : val;
}

template <typename T1 = T>
static ::arrow::enable_if_t<!std::is_floating_point<T1>::value, T> Coalesce(
T val, T fallback) {
return val;
}

static inline bool Compare(int type_length, const T& a, const T& b) { return a < b; }

static T Min(int type_length, T a, T b) { return a < b ? a : b; }
static T Max(int type_length, T a, T b) { return a < b ? b : a; }
};

这个地方用一个 Coalesce 来强制干掉 NaN. 关于浮点数操作,可以看 Spec:

这个地方封装了一层后,最后我们回到上层的 Make:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
std::shared_ptr<Comparator> Comparator::Make(Type::type physical_type,
SortOrder::type sort_order,
int type_length) {
if (SortOrder::SIGNED == sort_order) {
switch (physical_type) {
case Type::BOOLEAN:
return std::make_shared<TypedComparatorImpl<true, BooleanType>>();
case Type::INT32:
return std::make_shared<TypedComparatorImpl<true, Int32Type>>();
case Type::INT64:
return std::make_shared<TypedComparatorImpl<true, Int64Type>>();
case Type::INT96:
return std::make_shared<TypedComparatorImpl<true, Int96Type>>();
case Type::FLOAT:
return std::make_shared<TypedComparatorImpl<true, FloatType>>();
case Type::DOUBLE:
return std::make_shared<TypedComparatorImpl<true, DoubleType>>();
case Type::BYTE_ARRAY:
return std::make_shared<TypedComparatorImpl<true, ByteArrayType>>();
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<TypedComparatorImpl<true, FLBAType>>(type_length);
default:
ParquetException::NYI("Signed Compare not implemented");
}
} else if (SortOrder::UNSIGNED == sort_order) {
switch (physical_type) {
case Type::INT32:
return std::make_shared<TypedComparatorImpl<false, Int32Type>>();
case Type::INT64:
return std::make_shared<TypedComparatorImpl<false, Int64Type>>();
case Type::INT96:
return std::make_shared<TypedComparatorImpl<false, Int96Type>>();
case Type::BYTE_ARRAY:
return std::make_shared<TypedComparatorImpl<false, ByteArrayType>>();
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<TypedComparatorImpl<false, FLBAType>>(type_length);
default:
ParquetException::NYI("Unsigned Compare not implemented");
}
} else {
throw ParquetException("UNKNOWN Sort Order");
}
return nullptr;
}

没有 SortOrder 的东西,是创建不出 Comparator 的,这个地方可以留意一下。我们回到 Statistics,它的多层级类似 Comparator。它对外会走一个叫做 EncodedStatistics 的东西,它算是 thrift Statistics 的一个包装,用户不需要直接和 Statistics 打交道。(这玩意不包括 num_values…但是 num_values 需要靠 Statistics 收集)。

它的内部结构如下:

1
2
3
4
Statistics (nullCount, minmax, Encode)
- TypedStatistics
- TypedStatisticsImpl
+ TypedComparator

如果创建失败,比如 ColumnDescriptor 没有 SortOrder, 这玩意根本创建不出来。它可以根据 arrow 写入来更新内部的状态。

关于更新状态,这里也可以看作是一个联动的状态:

更新的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::pair<T, T> GetMinMax(const T* values, int64_t length) override {
DCHECK_GT(length, 0);

T min = Helper::DefaultMin();
T max = Helper::DefaultMax();

for (int64_t i = 0; i < length; i++) {
const auto val = SafeLoad(values + i);
min = Helper::Min(type_length_, min, Helper::Coalesce(val, Helper::DefaultMin()));
max = Helper::Max(type_length_, max, Helper::Coalesce(val, Helper::DefaultMax()));
}

return {min, max};
}

设置的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// In case of floating point types, the following rules are applied (as per
// upstream parquet-mr):
// - If any of min/max is NaN, return nothing.
// - If min is 0.0f, replace with -0.0f
// - If max is -0.0f, replace with 0.0f
template <typename T>
::arrow::enable_if_t<std::is_floating_point<T>::value, optional<std::pair<T, T>>>
CleanStatistic(std::pair<T, T> min_max) {
T min = min_max.first;
T max = min_max.second;

// Ignore if one of the value is nan.
if (std::isnan(min) || std::isnan(max)) {
return ::std::nullopt;
}

if (min == std::numeric_limits<T>::max() && max == std::numeric_limits<T>::lowest()) {
return ::std::nullopt;
}

T zero{};

if (min == zero && !std::signbit(min)) {
min = -min;
}

if (max == zero && std::signbit(max)) {
max = -max;
}

return {{min, max}};
}

void SetMinMaxPair(std::pair<T, T> min_max) {
// CleanStatistic can return a nullopt in case of erroneous values, e.g. NaN
auto maybe_min_max = CleanStatistic(min_max);
if (!maybe_min_max) return;

auto min = maybe_min_max.value().first;
auto max = maybe_min_max.value().second;

if (!has_min_max_) {
has_min_max_ = true;
Copy(min, &min_, min_buffer_.get());
Copy(max, &max_, max_buffer_.get());
} else {
Copy(comparator_->Compare(min_, min) ? min_ : min, &min_, min_buffer_.get());
Copy(comparator_->Compare(max_, max) ? max : max_, &max_, max_buffer_.get());
}
}

在这个层面上,我们就能很好理解 min-max 了。

SortOrder

我们上节介绍 Schema 的时候没介绍 SortOrder,因为 Parquet Standard 里面没有这个东西(有叫 sort_order 的别的东西)。这个是 C++ Parquet 内部实现 order 用的:

1
2
3
4
5
SortOrder::type sort_order() const {
auto la = logical_type();
auto pt = physical_type();
return la ? GetSortOrder(la, pt) : GetSortOrder(converted_type(), pt);
}

这里相当于 spec 里面的 TypeDefinedOrder,实现上当成了多份 SIGNED, UNSIGNED, UNKNOWN. LogicalType 都会定义自己的 SortOrder,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class LogicalType::Impl::Int final : public LogicalType::Impl::Compatible,
public LogicalType::Impl::Applicable {
public:
friend class IntLogicalType;

bool is_applicable(parquet::Type::type primitive_type,
int32_t primitive_length = -1) const override;
bool is_compatible(ConvertedType::type converted_type,
schema::DecimalMetadata converted_decimal_metadata) const override;
ConvertedType::type ToConvertedType(
schema::DecimalMetadata* out_decimal_metadata) const override;
std::string ToString() const override;
std::string ToJSON() const override;
format::LogicalType ToThrift() const override;
bool Equals(const LogicalType& other) const override;

int bit_width() const { return width_; }
bool is_signed() const { return signed_; }

private:
Int(int w, bool s)
: LogicalType::Impl(LogicalType::Type::INT,
(s ? SortOrder::SIGNED : SortOrder::UNSIGNED)),
width_(w),
signed_(s) {}
int width_ = 0;
bool signed_ = false;
};

也有一些类型,比如 VOID,对应的 Statistics 是 undefined.

目前的实现有个问题,如果是 undefined (甚至是类型标志奇怪了一些),就会产生 comparator 生成的异常,然后无法产生 statistics。这些会影响 interval 相关的类型。

在最终写入的时候,会有两种 statistics:

  1. page_statistics: 每个页面上的
  2. column_statistics: 汇总的 statistics.

在创建的时候,内容如下:

1
2
3
4
5
6
7
8
9
10
11
TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
std::unique_ptr<PageWriter> pager, const bool use_dictionary,
Encoding::type encoding, const WriterProperties* properties) {
// ...
if (properties->statistics_enabled(descr_->path()) &&
(SortOrder::UNKNOWN != descr_->sort_order())) {
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
}
}

每个页面会收集 page statistics,然后写入:

1
2
3
4
5
6
7
8
9
10
11
EncodedStatistics Encode() override {
EncodedStatistics s;
if (HasMinMax()) {
s.set_min(this->EncodeMin());
s.set_max(this->EncodeMax());
}
if (HasNullCount()) {
s.set_null_count(this->null_count());
}
return s;
}

和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 EncodedStatistics GetPageStatistics() override {
EncodedStatistics result;
if (page_statistics_) result = page_statistics_->Encode();
return result;
}

// 具体写入
EncodedStatistics page_stats = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
// ...
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, page_stats);

最后,借助 ToThrift 转化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
format::Statistics statistics;
if (stats.has_min) {
statistics.__set_min_value(stats.min());
// If the order is SIGNED, then the old min value must be set too.
// This for backward compatibility
if (stats.is_signed()) {
statistics.__set_min(stats.min());
}
}
if (stats.has_max) {
statistics.__set_max_value(stats.max());
// If the order is SIGNED, then the old max value must be set too.
// This for backward compatibility
if (stats.is_signed()) {
statistics.__set_max(stats.max());
}
}
if (stats.has_null_count) {
statistics.__set_null_count(stats.null_count);
}
if (stats.has_distinct_count) {
statistics.__set_distinct_count(stats.distinct_count);
}

return statistics;
}

上面是 Page 层,ColumnChunk 层会 Merge Page 层的,直到完成:

1
2
3
4
5
6
void ResetPageStatistics() override {
if (chunk_statistics_ != nullptr) {
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
}
}

C++ 实现: 读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Extracts encoded statistics from V1 and V2 data page headers
template <typename H>
EncodedStatistics ExtractStatsFromHeader(const H& header) {
EncodedStatistics page_statistics;
if (!header.__isset.statistics) {
return page_statistics;
}
const format::Statistics& stats = header.statistics;
// Use the new V2 min-max statistics over the former one if it is filled
if (stats.__isset.max_value || stats.__isset.min_value) {
// TODO: check if the column_order is TYPE_DEFINED_ORDER.
if (stats.__isset.max_value) {
page_statistics.set_max(stats.max_value);
}
if (stats.__isset.min_value) {
page_statistics.set_min(stats.min_value);
}
} else if (stats.__isset.max || stats.__isset.min) {
// TODO: check created_by to see if it is corrupted for some types.
// TODO: check if the sort_order is SIGNED.
if (stats.__isset.max) {
page_statistics.set_max(stats.max);
}
if (stats.__isset.min) {
page_statistics.set_min(stats.min);
}
}
if (stats.__isset.null_count) {
page_statistics.set_null_count(stats.null_count);
}
if (stats.__isset.distinct_count) {
page_statistics.set_distinct_count(stats.distinct_count);
}
return page_statistics;
}

然后 C++ 新版本中,有个 DataPageStats 用到了这个玩意。

实现的缺点

  • 字典页面没有收集 ndv
  • 需要 comparator。LogicalType Comparator 缺失的话,会完全没有 Statistics,对 Interval 之类的类型很不友好
  • 有几部分的实现感觉对读出来处理不太友好,比如 null_count / ndv

PageStats & ColumnIndex

ColumnChunk 上还会有 PageStatistics:

1
2
3
4
/** Set of all encodings used for pages in this column chunk.
* This information can be used to determine if all data pages are
* dictionary encoded for example **/
13: optional list<PageEncodingStats> encoding_stats;

这个是决定内部页面的 encoding 的,可以做 Dict encoding 检测。

ColumnIndex 上会有一些东西,你看看就懂了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Description for ColumnIndex.
* Each <array-field>[i] refers to the page at OffsetIndex.page_locations[i]
*/
struct ColumnIndex {
/**
* A list of Boolean values to determine the validity of the corresponding
* min and max values. If true, a page contains only null values, and writers
* have to set the corresponding entries in min_values and max_values to
* byte[0], so that all lists have the same length. If false, the
* corresponding entries in min_values and max_values must be valid.
*/
1: required list<bool> null_pages

/**
* Two lists containing lower and upper bounds for the values of each page
* determined by the ColumnOrder of the column. These may be the actual
* minimum and maximum values found on a page, but can also be (more compact)
* values that do not exist on a page. For example, instead of storing ""Blart
* Versenwald III", a writer may set min_values[i]="B", max_values[i]="C".
* Such more compact values must still be valid values within the column's
* logical type. Readers must make sure that list entries are populated before
* using them by inspecting null_pages.
*/
2: required list<binary> min_values
3: required list<binary> max_values

/**
* Stores whether both min_values and max_values are ordered and if so, in
* which direction. This allows readers to perform binary searches in both
* lists. Readers cannot assume that max_values[i] <= min_values[i+1], even
* if the lists are ordered.
*/
4: required BoundaryOrder boundary_order

/** A list containing the number of null values for each page **/
5: optional list<i64> null_counts
}

这里有个问题,就是对 NaN 的处理,这块正在 RFC 处理中:https://issues.apache.org/jira/browse/PARQUET-2249 .

其他很多部分其实也依赖 PageStatistics 的收集和隐含的 SortOrder。哎,这下令人唏嘘了