The parquet format supports an optional integer field_id which can be assigned to a field. Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.
// The ColumnDescriptor encapsulates information necessary to interpret // primitive column data in the context of a particular schema. We have to // examine the node structure of a column's path to the root in the schema tree // to be able to reassemble the nested structure from the repetition and // definition levels. classPARQUET_EXPORT ColumnDescriptor { public: ColumnDescriptor(schema::NodePtr node, int16_t max_definition_level, int16_t max_repetition_level, const SchemaDescriptor* schema_descr = NULLPTR); private: schema::NodePtr node_; const schema::PrimitiveNode* primitive_node_;
// Mapping between leaf nodes and root group of leaf (first node // below the schema's root group) // // For example, the leaf `a.b.c.d` would have a link back to `a` // // -- a <------ // -- -- b | // -- -- -- c | // -- -- -- -- d std::unordered_map<int, schema::NodePtr> leaf_to_base_;
// Mapping between ColumnPath DotString to the leaf index std::unordered_multimap<std::string, int> leaf_to_idx_; };
// List encodings: using the terminology from Impala to define different styles // of representing logical lists (a.k.a. ARRAY types) in Parquet schemas. Since // the converted type named in the Parquet metadata is ConvertedType::LIST we // use that terminology here. It also helps distinguish from the *_ARRAY // primitive types. // // One-level encoding: Only allows required lists with required cells // repeated value_type name // // Two-level encoding: Enables optional lists with only required cells // <required/optional> group list // repeated value_type item // // Three-level encoding: Enables optional lists with optional cells // <required/optional> group bag // repeated group list // <required/optional> value_type item // // 2- and 1-level encoding are respectively equivalent to 3-level encoding with // the non-repeated nodes set to required. // // The "official" encoding recommended in the Parquet spec is the 3-level, and // we use that as the default when creating list types. For semantic completeness // we allow the other two. Since all types of encodings will occur "in the // wild" we need to be able to interpret the associated definition levels in // the context of the actual encoding used in the file. // // NB: Some Parquet writers may not set ConvertedType::LIST on the repeated // SchemaElement, which could make things challenging if we are trying to infer // that a sequence of nodes semantically represents an array according to one // of these encodings (versus a struct containing an array). We should refuse // the temptation to guess, as they say. structListEncoding { enumtype { ONE_LEVEL, TWO_LEVEL, THREE_LEVEL }; };
本来我们应该如同前面一样,介绍 SchemaManifest 这个类型,但我们还是先从 User 层面来介绍一些,首先,我们看看一个奇妙的递归 cacl. 这个是根据 Arrow 的类型来做 Calc。我想任何一个大一学生都能写出来…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
intCalculateLeafCount(const DataType* type){ if (type->id() == ::arrow::Type::EXTENSION) { type = checked_cast<const ExtensionType&>(*type).storage_type().get(); } // Note num_fields() can be 0 for an empty struct type if (!::arrow::is_nested(type->id())) { // Primitive type. return1; }
int num_leaves = 0; for (constauto& field : type->fields()) { num_leaves += CalculateLeafCount(field->type().get()); } return num_leaves; }
The parquet format supports an optional integer field_id which can be assigned to a field. Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.
intFieldIdFromMetadata( const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata){ if (!metadata) { return-1; } int key = metadata->FindKey(FIELD_ID_KEY); if (key < 0) { return-1; } std::string field_id_str = metadata->value(key); int field_id; if (::arrow::internal::ParseValue<::arrow::Int32Type>( field_id_str.c_str(), field_id_str.length(), &field_id)) { if (field_id < 0) { // Thrift should convert any negative value to null but normalize to -1 here in case // we later check this in logic. return-1; } return field_id; } else { return-1; } }
// Try to deserialize original Arrow schema RETURN_NOT_OK( GetOriginSchema(metadata, &manifest->schema_metadata, &manifest->origin_schema)); // Ignore original schema if it's not compatible with the Parquet schema if (manifest->origin_schema != nullptr && manifest->origin_schema->num_fields() != schema_node.field_count()) { manifest->origin_schema = nullptr; }
for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) { SchemaField* out_field = &manifest->schema_fields[i]; RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), LevelInfo(), &ctx, /*parent=*/nullptr, out_field));
// TODO(wesm): as follow up to ARROW-3246, we should really pass the origin // schema (if any) through all functions in the schema reconstruction, but // I'm being lazy and just setting dictionary fields at the top level for // now if (manifest->origin_schema == nullptr) { continue; }
auto origin_field = manifest->origin_schema->field(i); RETURN_NOT_OK(ApplyOriginalMetadata(*origin_field, out_field)); } return Status::OK(); }
if (num_children > 0 && origin_type->num_fields() == num_children) { DCHECK_EQ(static_cast<int>(inferred->children.size()), num_children); constauto factory = GetNestedFactory(*origin_type, *inferred_type); if (factory) { // The type may be modified (e.g. LargeList) while the children stay the same modified |= origin_type->id() != inferred_type->id();
// Apply original metadata recursively to children for (int i = 0; i < inferred_type->num_fields(); ++i) { ARROW_ASSIGN_OR_RAISE( constbool child_modified, ApplyOriginalMetadata(*origin_type->field(i), &inferred->children[i])); modified |= child_modified; } if (modified) { // Recreate this field using the modified child fields ::arrow::FieldVector modified_children(inferred_type->num_fields()); for (int i = 0; i < inferred_type->num_fields(); ++i) { modified_children[i] = inferred->children[i].field; } inferred->field = inferred->field->WithType(factory(std::move(modified_children))); } } }
if (origin_type->id() == ::arrow::Type::TIMESTAMP && inferred_type->id() == ::arrow::Type::TIMESTAMP) { // Restore time zone, if any constauto& ts_type = checked_cast<const ::arrow::TimestampType&>(*inferred_type); constauto& ts_origin_type = checked_cast<const ::arrow::TimestampType&>(*origin_type);
// If the data is tz-aware, then set the original time zone, since Parquet // has no native storage for timezones if (ts_type.timezone() == "UTC" && !ts_origin_type.timezone().empty()) { if (ts_type.unit() == ts_origin_type.unit()) { inferred->field = inferred->field->WithType(origin_type); } else { auto ts_type_new = ::arrow::timestamp(ts_type.unit(), ts_origin_type.timezone()); inferred->field = inferred->field->WithType(ts_type_new); } } modified = true; }
if (origin_type->id() == ::arrow::Type::DURATION && inferred_type->id() == ::arrow::Type::INT64) { // Read back int64 arrays as duration. inferred->field = inferred->field->WithType(origin_type); modified = true; }
if (origin_type->id() == ::arrow::Type::DICTIONARY && inferred_type->id() != ::arrow::Type::DICTIONARY && IsDictionaryReadSupported(*inferred_type)) { // Direct dictionary reads are only supported for a couple primitive types, // so no need to recurse on value types. constauto& dict_origin_type = checked_cast<const ::arrow::DictionaryType&>(*origin_type); inferred->field = inferred->field->WithType( ::arrow::dictionary(::arrow::int32(), inferred_type, dict_origin_type.ordered())); modified = true; }
if ((origin_type->id() == ::arrow::Type::LARGE_BINARY && inferred_type->id() == ::arrow::Type::BINARY) || (origin_type->id() == ::arrow::Type::LARGE_STRING && inferred_type->id() == ::arrow::Type::STRING)) { // Read back binary-like arrays with the intended offset width. inferred->field = inferred->field->WithType(origin_type); modified = true; }
Status NodeToSchemaField(const Node& node, LevelInfo current_levels, SchemaTreeContext* ctx, const SchemaField* parent, SchemaField* out){ // Workhorse function for converting a Parquet schema node to an Arrow // type. Handles different conventions for nested data.
ctx->LinkParent(out, parent);
// Now, walk the schema and create a ColumnDescriptor for each leaf node if (node.is_group()) { // A nested field, but we don't know what kind yet returnGroupToSchemaField(static_cast<const GroupNode&>(node), current_levels, ctx, parent, out); } else { // Either a normal flat primitive type, or a list type encoded with 1-level // list encoding. Note that the 3-level encoding is the form recommended by // the parquet specification, but technically we can have either // // required/optional $TYPE $FIELD_NAME // // or // // repeated $TYPE $FIELD_NAME constauto& primitive_node = static_cast<const PrimitiveNode&>(node); int column_index = ctx->schema->GetColumnIndex(primitive_node); ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type, GetTypeForNode(column_index, primitive_node, ctx)); if (node.is_repeated()) { // One-level list encoding, e.g. // a: repeated int32; int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated(); out->children.resize(1); auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false); RETURN_NOT_OK(PopulateLeaf(column_index, child_field, current_levels, ctx, out, &out->children[0]));
out->field = ::arrow::field(node.name(), ::arrow::list(child_field), /*nullable=*/false, FieldIdMetadata(node.field_id())); out->level_info = current_levels; // At this point current_levels has consider this list the ancestor so restore // the actual ancestor. out->level_info.repeated_ancestor_def_level = repeated_ancestor_def_level; return Status::OK(); } else { current_levels.Increment(node); // A normal (required/optional) primitive node returnPopulateLeaf(column_index, ::arrow::field(node.name(), type, node.is_optional(), FieldIdMetadata(node.field_id())), current_levels, ctx, parent, out); } } }