/// \brief Kernel data structure for implementations of /// ScalarAggregateFunction. The four necessary components of an aggregation /// kernel are the init, consume, merge, and finalize functions. /// /// * init: creates a new KernelState for a kernel. /// * consume: processes an ExecSpan and updates the KernelState found in the /// KernelContext. /// * merge: combines one KernelState with another. /// * finalize: produces the end result of the aggregation using the /// KernelState in the KernelContext. structARROW_EXPORT ScalarAggregateKernel : public Kernel { ScalarAggregateConsume consume; ScalarAggregateMerge merge; ScalarAggregateFinalize finalize; /// \brief Whether this kernel requires ordering /// Some aggregations, such as, "first", requires some kind of input order. The /// order can be implicit, e.g., the order of the input data, or explicit, e.g. /// the ordering specified with a window aggregation. /// The caller of the aggregate kernel is responsible for passing data in some /// defined order to the kernel. The flag here is a way for the kernel to tell /// the caller that data passed to the kernel must be defined in some order. bool ordered = false; };
(这里面的 ordered 处于挖了坑,但是只防御了 Pipeline 去处理 Order 的状态,不是一个很完善的 order 处理)
classScalarAggregateNode : public ExecNode, public TracedNode { const std::vector<const ScalarAggregateKernel*> kernels_; // Input type holders for each kernel, used for state initialization std::vector<std::vector<TypeHolder>> kernel_intypes_; std::vector<std::vector<std::unique_ptr<KernelState>>> states_; };
/// \brief Kernel data structure for implementations of /// HashAggregateFunction. The four necessary components of an aggregation /// kernel are the init, consume, merge, and finalize functions. /// /// * init: creates a new KernelState for a kernel. /// * resize: ensure that the KernelState can accommodate the specified number of groups. /// * consume: processes an ExecSpan (which includes the argument as well /// as an array of group identifiers) and updates the KernelState found in the /// KernelContext. /// * merge: combines one KernelState with another. /// * finalize: produces the end result of the aggregation using the /// KernelState in the KernelContext. structARROW_EXPORT HashAggregateKernel : public Kernel { HashAggregateResize resize; HashAggregateConsume consume; HashAggregateMerge merge; HashAggregateFinalize finalize; /// @brief whether the summarizer requires ordering /// This is similar to ScalarAggregateKernel. See ScalarAggregateKernel /// for detailed doc of this variable. bool ordered = false; };
我们也可以看到 GroupedAggregator 的形态:
1 2 3 4 5 6 7 8 9 10 11
/// C++ abstract base class for the HashAggregateKernel interface. /// Implementations should be default constructible and perform initialization in /// Init(). structGroupedAggregator : KernelState { virtual Status Init(ExecContext*, const KernelInitArgs& args)= 0; virtual Status Resize(int64_t new_num_groups)= 0; virtual Status Consume(const ExecSpan& batch)= 0; virtual Status Merge(GroupedAggregator&& other, const ArrayData& group_id_mapping)= 0; virtual Result<Datum> Finalize()= 0; virtual std::shared_ptr<DataType> out_type()const= 0; };
/// \brief Kernel data structure for implementations of VectorFunction. In /// contains an optional finalizer function, the null handling and memory /// pre-allocation preferences (which have different defaults from /// ScalarKernel), and some other execution-related options. structARROW_EXPORT VectorKernel : public Kernel { /// \brief See VectorKernel::finalize member for usage using FinalizeFunc = std::function<Status(KernelContext*, std::vector<Datum>*)>;
/// \brief Function for executing a stateful VectorKernel against a /// ChunkedArray input. Does not need to be defined for all VectorKernels using ChunkedExec = Status (*)(KernelContext*, const ExecBatch&, Datum* out);
/// \brief Perform a single invocation of this kernel. Any required state is /// managed through the KernelContext. ArrayKernelExec exec;
/// \brief Execute the kernel on a ChunkedArray. Does not need to be defined ChunkedExec exec_chunked = NULLPTR;
/// \brief For VectorKernel, convert intermediate results into finalized /// results. Mutates input argument. Some kernels may accumulate state /// (example: hashing-related functions) through processing chunked inputs, and /// then need to attach some accumulated state to each of the outputs of /// processing each chunk of data. FinalizeFunc finalize;
/// Since vector kernels generally are implemented rather differently from /// scalar/elementwise kernels (and they may not even yield arrays of the same /// size), so we make the developer opt-in to any memory preallocation rather /// than having to turn it off. NullHandling::type null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; MemAllocation::type mem_allocation = MemAllocation::NO_PREALLOCATE;
/// \brief Writing execution results into larger contiguous allocations /// requires that the kernel be able to write into sliced output ArrayData*, /// including sliced output validity bitmaps. Some kernel implementations may /// not be able to do this, so setting this to false disables this /// functionality. bool can_write_into_slices = true;
/// Some vector kernels can do chunkwise execution using ExecSpanIterator, /// in some cases accumulating some state. Other kernels (like Take) need to /// be passed whole arrays and don't work on ChunkedArray inputs bool can_execute_chunkwise = true;
/// Some kernels (like unique and value_counts) yield non-chunked output from /// chunked-array inputs. This option controls how the results are boxed when /// returned from ExecVectorFunction /// /// true -> ChunkedArray /// false -> Array bool output_chunked = true; };
// ---------------------------------------------------------------------- template <typename HashKernel> Result<std::unique_ptr<KernelState>> HashInit(KernelContext* ctx, const KernelInitArgs& args) { auto result = std::make_unique<HashKernel>(args.inputs[0].GetSharedPtr(), args.options, ctx->memory_pool()); RETURN_NOT_OK(result->Reset()); // R build with openSUSE155 requires an explicit unique_ptr construction return std::unique_ptr<KernelState>(std::move(result)); }
template <typename Action> KernelInit GetHashInit(Type::type type_id){ // ARROW-8933: Generate only a single hash kernel per physical data // representation switch (type_id) { case Type::NA: return HashInit<NullHashKernel<Action>>; case Type::BOOL: return HashInit<RegularHashKernel<BooleanType, Action>>; case Type::INT8: case Type::UINT8: return HashInit<RegularHashKernel<UInt8Type, Action>>; case Type::INT16: case Type::UINT16: return HashInit<RegularHashKernel<UInt16Type, Action>>; case Type::INT32: case Type::UINT32: case Type::FLOAT: case Type::DATE32: case Type::TIME32: case Type::INTERVAL_MONTHS: return HashInit<RegularHashKernel<UInt32Type, Action>>; case Type::INT64: case Type::UINT64: case Type::DOUBLE: case Type::DATE64: case Type::TIME64: case Type::TIMESTAMP: case Type::DURATION: case Type::INTERVAL_DAY_TIME: return HashInit<RegularHashKernel<UInt64Type, Action>>; case Type::BINARY: case Type::STRING: return HashInit<RegularHashKernel<BinaryType, Action, std::string_view>>; case Type::LARGE_BINARY: case Type::LARGE_STRING: return HashInit<RegularHashKernel<LargeBinaryType, Action, std::string_view>>; case Type::BINARY_VIEW: case Type::STRING_VIEW: return HashInit<RegularHashKernel<BinaryViewType, Action, std::string_view>>; case Type::FIXED_SIZE_BINARY: case Type::DECIMAL128: case Type::DECIMAL256: return HashInit<RegularHashKernel<FixedSizeBinaryType, Action, std::string_view>>; case Type::INTERVAL_MONTH_DAY_NANO: return HashInit<RegularHashKernel<MonthDayNanoIntervalType, Action>>; default: Unreachable("non hashable type"); } }
using DictionaryEncodeState = OptionsWrapper<DictionaryEncodeOptions>;