// Returns true if there are still values in this column. virtualboolHasNext()= 0;
virtual Type::type type()const= 0;
virtualconst ColumnDescriptor* descr()const= 0;
// Get the encoding that can be exposed by this reader. If it returns // dictionary encoding, then ReadBatchWithDictionary can be used to read data. // // \note API EXPERIMENTAL virtual ExposedEncoding GetExposedEncoding()= 0;
protected: friendclassRowGroupReader; // Set the encoding that can be exposed by this reader. // // \note API EXPERIMENTAL virtualvoidSetExposedEncoding(ExposedEncoding encoding)= 0; };
// API to read values from a single column. This is a main client facing API. template <typename DType> classTypedColumnReader : public ColumnReader { public: typedeftypename DType::c_type T;
// Read a batch of repetition levels, definition levels, and values from the // column. // // Since null values are not stored in the values, the number of values read // may be less than the number of repetition and definition levels. With // nested data this is almost certainly true. // // Set def_levels or rep_levels to nullptr if you want to skip reading them. // This is only safe if you know through some other source that there are no // undefined values. // // To fully exhaust a row group, you must read batches until the number of // values read reaches the number of stored values according to the metadata. // // This API is the same for both V1 and V2 of the DataPage // // @returns: actual number of levels read (see values_read for number of values read) virtualint64_tReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read)= 0; // ... };
/// \brief Stateful column reader that delimits semantic records for both flat /// and nested columns /// /// \note API EXPERIMENTAL /// \since 1.3.0 classRecordReader { public: static std::shared_ptr<RecordReader> Make( const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), constbool read_dictionary = false);
virtual ~RecordReader() = default;
/// \brief Attempt to read indicated number of records from column chunk /// \return number of records read virtualint64_tReadRecords(int64_t num_records)= 0;
/// \brief Pre-allocate space for data. Results in better flat read performance virtualvoidReserve(int64_t num_values)= 0;
/// \brief Clear consumed values and repetition/definition levels as the /// result of calling ReadRecords virtualvoidReset()= 0;
/// \brief Transfer filled values buffer to caller. A new one will be /// allocated in subsequent ReadRecords calls virtual std::shared_ptr<ResizableBuffer> ReleaseValues()= 0;
/// \brief Transfer filled validity bitmap buffer to caller. A new one will /// be allocated in subsequent ReadRecords calls virtual std::shared_ptr<ResizableBuffer> ReleaseIsValid()= 0;
/// \brief Return true if the record reader has more internal data yet to /// process virtualboolHasMoreData()const= 0;
/// \brief Advance record reader to the next row group /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader virtualvoidSetPageReader(std::unique_ptr<PageReader> reader)= 0;
/// \brief Decoded values, including nulls, if any uint8_t* values()const{ return values_->mutable_data(); }
/// \brief Number of values written including nulls (if any) int64_tvalues_written()const{ return values_written_; }
/// \brief Number of definition / repetition levels (from those that have /// been decoded) that have been consumed inside the reader. int64_tlevels_position()const{ return levels_position_; }
/// \brief Number of definition / repetition levels that have been written /// internally in the reader int64_tlevels_written()const{ return levels_written_; }
/// \brief Number of nulls in the leaf int64_tnull_count()const{ return null_count_; }
/// \brief True if the leaf values are nullable boolnullable_values()const{ return nullable_values_; }
/// \brief True if reading directly as Arrow dictionary-encoded boolread_dictionary()const{ return read_dictionary_; }
// No repetition levels, skip delimiting logic. Each level represents a // null or not null entry records_read = std::min(levels_written_ - levels_position_, num_records);
// This is advanced by DelimitRecords, which we skipped levels_position_ += records_read; } else { records_read = values_to_read = num_records; }
// Process written repetition/definition levels to reach the end of // records. Process no more levels than necessary to delimit the indicated // number of logical records. Updates internal state of RecordReader // // 这个函数只会发生在 rep-level 和 def-level 都在的情况下. 这里会根据内部状态 `at_record_start_` // 判断是否在一个行的开头. 目标是不越过 `levels_written_` 的情况下读 `num_records` 行. // 返回读的行数和 values // // \return Number of records delimited int64_tDelimitRecords(int64_t num_records, int64_t* values_seen){ int64_t values_to_read = 0; int64_t records_read = 0;
// Count logical records and number of values to read // 读 level_position 来判断内容 // 这里的逻辑会先读 rep_level, 来查一下行的变更, rep_level == 0 一定切了行, 读完了就会返回. // 然后会检查 `def_level`, 看这个是否是 null. 不是 null 就会添加 `values_to_read`. // **非 0 的 rep-level 不会被这一层处理** while (levels_position_ < levels_written_) { constint16_t rep_level = *rep_levels++; // rep_level == 0 的时候, 一定是整行的开头. if (rep_level == 0) { // If at_record_start_ is true, we are seeing the start of a record // for the second time, such as after repeated calls to // DelimitRecords. In this case we must continue until we find // another record start or exhausting the ColumnChunk // // 上一次走完了这行, 就会 `!at_record_start_`. if (!at_record_start_) { // We've reached the end of a record; increment the record count. ++records_read; if (records_read == num_records) { // We've found the number of records we were looking for. Set // at_record_start_ to true and break at_record_start_ = true; break; } } } // We have decided to consume the level at this position; therefore we // must advance until we find another record boundary at_record_start_ = false;
// At this point, the column reader is a stream iterator. It only knows how to // read the next batch of values for a particular column from the file until it // runs out. // // We also do not expose any internal Parquet details, such as row groups. This // might change in the future. classPARQUET_EXPORT ColumnReader { public: virtual ~ColumnReader() = default;
// Scan the next array of the indicated size. The actual size of the // returned array may be less than the passed size depending how much data is // available in the file. // // When all the data in the file has been exhausted, the result is set to // nullptr. // // Returns Status::OK on a successful read, including if you have exhausted // the data available in the file. virtual ::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::ChunkedArray>* out)= 0; };
classColumnReaderImpl : public ColumnReader { public: virtual Status GetDefLevels(constint16_t** data, int64_t* length)= 0; virtual Status GetRepLevels(constint16_t** data, int64_t* length)= 0; virtualconst std::shared_ptr<Field> field()= 0;
::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::ChunkedArray>* out)final{ RETURN_NOT_OK(LoadBatch(batch_size)); RETURN_NOT_OK(BuildArray(batch_size, out)); for (int x = 0; x < (*out)->num_chunks(); x++) { RETURN_NOT_OK((*out)->chunk(x)->Validate()); } return Status::OK(); }
// Leaf reader is for primitive arrays and primitive children of nested arrays classLeafReader : public ColumnReaderImpl { public: Status GetDefLevels(constint16_t** data, int64_t* length)final{ *data = record_reader_->def_levels(); *length = record_reader_->levels_position(); return Status::OK(); }
// 只考虑 def-level, 用于叶子结点. voidDefLevelsToBitmap(constint16_t* def_levels, int64_t num_def_levels, LevelInfo level_info, ValidityBitmapInputOutput* output){ // It is simpler to rely on rep_level here until PARQUET-1899 is done and the code // is deleted in a follow-up release. if (level_info.rep_level > 0) { #if defined(ARROW_HAVE_RUNTIME_BMI2) if (CpuInfo::GetInstance()->HasEfficientBmi2()) { returnDefLevelsToBitmapBmi2WithRepeatedParent(def_levels, num_def_levels, level_info, output); } #endif standard::DefLevelsToBitmapSimd</*has_repeated_parent=*/true>( def_levels, num_def_levels, level_info, output); } else { standard::DefLevelsToBitmapSimd</*has_repeated_parent=*/false>( def_levels, num_def_levels, level_info, output); } }
// 根据 def-rep 来算, 拿到叶子结点的 level 推断父亲的列表. voidDefRepLevelsToBitmap(constint16_t* def_levels, constint16_t* rep_levels, int64_t num_def_levels, LevelInfo level_info, ValidityBitmapInputOutput* output){ // DefReplevelsToListInfo assumes it for the actual list method and this // method is for parent structs, so we need to bump def and ref level. level_info.rep_level += 1; level_info.def_level += 1; DefRepLevelsToListInfo<int32_t>(def_levels, rep_levels, num_def_levels, level_info, output, /*offsets=*/nullptr); }
offsets 参数是给 List 这样内容准备的。不考虑 offset 情况下,struct 对应逻辑:
template <typename OffsetType> voidDefRepLevelsToListInfo(constint16_t* def_levels, constint16_t* rep_levels, int64_t num_def_levels, LevelInfo level_info, ValidityBitmapInputOutput* output, OffsetType* offsets){ OffsetType* orig_pos = offsets; optional<::arrow::internal::FirstTimeBitmapWriter> valid_bits_writer; if (output->valid_bits) { valid_bits_writer.emplace(output->valid_bits, output->valid_bits_offset, output->values_read_upper_bound); } for (int x = 0; x < num_def_levels; x++) { // Skip items that belong to empty or null ancestor lists and further nested lists. if (def_levels[x] < level_info.repeated_ancestor_def_level || rep_levels[x] > level_info.rep_level) { continue; }
if (rep_levels[x] == level_info.rep_level) { // A continuation of an existing list. // offsets can be null for structs with repeated children (we don't need to know // offsets until we get to the children). if (offsets != nullptr) { if (ARROW_PREDICT_FALSE(*offsets == std::numeric_limits<OffsetType>::max())) { throwParquetException("List index overflow."); } *offsets += 1; } } else { if (ARROW_PREDICT_FALSE( (valid_bits_writer.has_value() && valid_bits_writer->position() >= output->values_read_upper_bound) || (offsets - orig_pos) >= output->values_read_upper_bound)) { std::stringstream ss; ss << "Definition levels exceeded upper bound: " << output->values_read_upper_bound; throwParquetException(ss.str()); }
// current_rep < list rep_level i.e. start of a list (ancestor empty lists are // filtered out above). // offsets can be null for structs with repeated children (we don't need to know // offsets until we get to the children). if (offsets != nullptr) { ++offsets; // Use cumulative offsets because variable size lists are more common then // fixed size lists so it should be cheaper to make these cumulative and // subtract when validating fixed size lists. *offsets = *(offsets - 1); if (def_levels[x] >= level_info.def_level) { if (ARROW_PREDICT_FALSE(*offsets == std::numeric_limits<OffsetType>::max())) { throwParquetException("List index overflow."); } *offsets += 1; } }
if (valid_bits_writer.has_value()) { // the level_info def level for lists reflects element present level. // the prior level distinguishes between empty lists. if (def_levels[x] >= level_info.def_level - 1) { valid_bits_writer->Set(); } else { output->null_count++; valid_bits_writer->Clear(); } valid_bits_writer->Next(); } } } if (valid_bits_writer.has_value()) { valid_bits_writer->Finish(); } if (offsets != nullptr) { output->values_read = offsets - orig_pos; } elseif (valid_bits_writer.has_value()) { output->values_read = valid_bits_writer->position(); } if (output->null_count > 0 && level_info.null_slot_usage > 1) { throwParquetException( "Null values with null_slot_usage > 1 not supported." "(i.e. FixedSizeLists with null values are not supported)"); } }
总结
这篇文章继续讲了 parquet 的 C++ 标准库怎么读取内容的。简单介绍了一下 文件 — Column 的读取流。Column 下面的编码这篇文章并没有涵盖。如果看不懂的话…感觉也很正常,因为感觉这块代码本身写的质量就…一般般(主要是感觉抽象程度太低了,而且抽的很令人困惑)?不过能把这套东西做出来还是蛮牛逼的,老实说我看着就头疼。这文章看懂能明白大概流程我觉得就很不错了,我也是半当笔记自己写的。