/// \brief Common initializer function for all kernel types. using KernelInit = std::function<Result<std::unique_ptr<KernelState>>( KernelContext*, const KernelInitArgs&)>;
/// \brief Base type for kernels. Contains the function signature and /// optionally the state initialization function, along with some common /// attributes structARROW_EXPORT Kernel { Kernel() = default;
/// \brief The "signature" of the kernel containing the InputType input /// argument validators and OutputType output type resolver. std::shared_ptr<KernelSignature> signature;
/// \brief Create a new KernelState for invocations of this kernel, e.g. to /// set up any options or state relevant for execution. KernelInit init;
/// \brief Create a vector of new KernelState for invocations of this kernel. static Status InitAll(KernelContext*, const KernelInitArgs&, std::vector<std::unique_ptr<KernelState>>*);
/// \brief Indicates whether execution can benefit from parallelization /// (splitting large chunks into smaller chunks and using multiple /// threads). Some kernels may not support parallel execution at /// all. Synchronization and concurrency-related issues are currently the /// responsibility of the Kernel's implementation. bool parallelizable = true;
/// \brief Indicates the level of SIMD instruction support in the host CPU is /// required to use the function. The intention is for functions to be able to /// contain multiple kernels with the same signature but different levels of SIMD, /// so that the most optimized kernel supported on a host's processor can be chosen. SimdLevel::type simd_level = SimdLevel::NONE;
// Additional kernel-specific data std::shared_ptr<KernelState> data; };
举例子是 Add 这个 Binary 的 Scalar Function:
1 2
auto add = MakeArithmeticFunction<Add>("add", add_doc); AddDecimalBinaryKernels<Add>("add", add.get());
// 这里还塞了一个 FunctionDoc 进去 const FunctionDoc add_doc{"Add the arguments element-wise", ("Results will wrap around on integer overflow.\n" "Use function \"add_checked\" if you want overflow\n" "to return an error."), {"x", "y"}};
然后和:
1 2 3 4 5 6 7 8 9 10 11
template <typename Op, typename FunctionImpl = ArithmeticFunction> std::shared_ptr<ScalarFunction> MakeArithmeticFunction(std::string name, FunctionDoc doc) { auto func = std::make_shared<FunctionImpl>(name, Arity::Binary(), std::move(doc)); for (constauto& ty : NumericTypes()) { auto exec = ArithmeticExecFromOp<ScalarBinaryEqualTypes, Op>(ty); DCHECK_OK(func->AddKernel({ty, ty}, ty, exec)); } AddNullExec(func.get()); return func; }
using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, ExecResult*);
template <template <typename...> classKernelGenerator, typename Op, typename KernelType = ArrayKernelExec, typename... Args> KernelType ArithmeticExecFromOp(detail::GetTypeId get_id) { switch (get_id.id) { case Type::INT8: return KernelGenerator<Int8Type, Int8Type, Op, Args...>::Exec; case Type::UINT8: return KernelGenerator<UInt8Type, UInt8Type, Op, Args...>::Exec; case Type::INT16: return KernelGenerator<Int16Type, Int16Type, Op, Args...>::Exec; case Type::UINT16: return KernelGenerator<UInt16Type, UInt16Type, Op, Args...>::Exec; case Type::INT32: return KernelGenerator<Int32Type, Int32Type, Op, Args...>::Exec; case Type::UINT32: return KernelGenerator<UInt32Type, UInt32Type, Op, Args...>::Exec; case Type::DURATION: case Type::INT64: case Type::TIMESTAMP: return KernelGenerator<Int64Type, Int64Type, Op, Args...>::Exec; case Type::UINT64: return KernelGenerator<UInt64Type, UInt64Type, Op, Args...>::Exec; case Type::FLOAT: return KernelGenerator<FloatType, FloatType, Op, Args...>::Exec; case Type::DOUBLE: return KernelGenerator<DoubleType, DoubleType, Op, Args...>::Exec; default: DCHECK(false); return FailFunctor<KernelType>::Exec; }
下面我们来看 Generator:
1 2 3 4
// A kernel exec generator for binary kernels where both input types are the // same template <typename OutType, typename ArgType, typename Op> using ScalarBinaryEqualTypes = ScalarBinary<OutType, ArgType, ArgType, Op>;
// A kernel exec generator for binary functions that addresses both array and // scalar inputs and dispatches input iteration and output writing to other // templates // // This template executes the operator even on the data behind null values, // therefore it is generally only suitable for operators that are safe to apply // even on the null slot values. // // The "Op" functor should have the form // // struct Op { // template <typename OutValue, typename Arg0Value, typename Arg1Value> // static OutValue Call(KernelContext* ctx, Arg0Value arg0, Arg1Value arg1, Status* st) // { // // implementation // // NOTE: "status" should only populated with errors, // // leave it unmodified to indicate Status::OK() // } // }; template <typename OutType, typename Arg0Type, typename Arg1Type, typename Op> structScalarBinary { using OutValue = typename GetOutputType<OutType>::T; using Arg0Value = typename GetViewType<Arg0Type>::T; using Arg1Value = typename GetViewType<Arg1Type>::T;
/// \brief Arguments to pass to an KernelInit function. A struct is used to help /// avoid API breakage should the arguments passed need to be expanded. structKernelInitArgs { /// \brief A pointer to the kernel being initialized. The init function may /// depend on the kernel's KernelSignature or other data contained there. const Kernel* kernel;
/// \brief The types of the input arguments that the kernel is /// about to be executed against. const std::vector<TypeHolder>& inputs;
/// \brief Opaque options specific to this kernel. May be nullptr for functions /// that do not require options. const FunctionOptions* options; };
/// \brief Common initializer function for all kernel types. using KernelInit = std::function<Result<std::unique_ptr<KernelState>>( KernelContext*, const KernelInitArgs&)>;
/// \brief Base type for kernels. Contains the function signature and /// optionally the state initialization function, along with some common /// attributes structARROW_EXPORT Kernel { Kernel() = default;
/// \brief The "signature" of the kernel containing the InputType input /// argument validators and OutputType output type resolver. std::shared_ptr<KernelSignature> signature;
/// \brief Create a new KernelState for invocations of this kernel, e.g. to /// set up any options or state relevant for execution. KernelInit init;
/// \brief Create a vector of new KernelState for invocations of this kernel. static Status InitAll(KernelContext*, const KernelInitArgs&, std::vector<std::unique_ptr<KernelState>>*);
using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, ExecResult*);
/// \brief Kernel data structure for implementations of ScalarFunction. In /// addition to the members found in Kernel, contains the null handling /// and memory pre-allocation preferences. structARROW_EXPORT ScalarKernel : public Kernel { ScalarKernel() = default;
/// \brief Perform a single invocation of this kernel. Depending on the /// implementation, it may only write into preallocated memory, while in some /// cases it will allocate its own memory. Any required state is managed /// through the KernelContext. ArrayKernelExec exec;
/// \brief Writing execution results into larger contiguous allocations /// requires that the kernel be able to write into sliced output ArrayData*, /// including sliced output validity bitmaps. Some kernel implementations may /// not be able to do this, so setting this to false disables this /// functionality. bool can_write_into_slices = true;
// For scalar functions preallocated data and intersecting arg validity // bitmaps is a reasonable default NullHandling::type null_handling = NullHandling::INTERSECTION; MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE; };
/// The Kernel's `init` method must be called and any KernelState set in the /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate /// the case where init may be expensive and does not need to be called again for /// each execution of the kernel, for example the same lookup table can be re-used /// for all scanned batches in a dataset filter. virtual Status Init(KernelContext*, KernelInitArgs)= 0;
// TODO(wesm): per ARROW-16819, adding ExecBatch variant so that a batch // length can be passed in for scalar functions; will have to return and // clean a bunch of things up virtual Status Execute(const ExecBatch& batch, ExecListener* listener)= 0;
virtual Datum WrapResults(const std::vector<Datum>& args, const std::vector<Datum>& outputs)= 0;
/// \brief Check the actual result type against the resolved output type virtual Status CheckResultType(const Datum& out, constchar* function_name)= 0;
Status ExecuteNonSpans(ExecListener* listener){ // ARROW-16756: Kernel is going to allocate some memory and so // for the time being we pass in an empty or partially-filled // shared_ptr<ArrayData> or shared_ptr<Scalar> to be populated // by the kernel. // // We will eventually delete the Scalar output path per // ARROW-16757. ExecSpan input; ExecResult output; while (span_iterator_.Next(&input)) { ARROW_ASSIGN_OR_RAISE(output.value, PrepareOutput(input.length)); DCHECK(output.is_array_data());
/// \brief Return a best-match kernel that can execute the function given the argument /// types, after implicit casts are applied. /// /// \param[in,out] values Argument types. An element may be modified to /// indicate that the returned kernel only approximately matches the input /// value descriptors; callers are responsible for casting inputs to the type /// required by the kernel. virtual Result<const Kernel*> DispatchBest(std::vector<TypeHolder>* values)const;
/// \brief An type-checking interface to permit customizable validation rules /// for use with InputType and KernelSignature. This is for scenarios where the /// acceptance is not an exact type instance, such as a TIMESTAMP type for a /// specific TimeUnit, but permitting any time zone. structARROW_EXPORT TypeMatcher { virtual ~TypeMatcher() = default;
/// \brief Return true if this matcher accepts the data type. virtualboolMatches(const DataType& type)const= 0;
/// \brief A human-interpretable string representation of what the type /// matcher checks for, usable when printing KernelSignature or formatting /// error messages. virtual std::string ToString()const= 0;
/// \brief Return true if this TypeMatcher contains the same matching rule as /// the other. Currently depends on RTTI. virtualboolEquals(const TypeMatcher& other)const= 0; };
这里也允许 Output 计算出对应的输出类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/// \brief Container to capture both exact and input-dependent output types. classARROW_EXPORT OutputType { public: /// \brief An enum indicating whether the value type is an invariant fixed /// value or one that's computed by a kernel-defined resolver function. enumResolveKind { FIXED, COMPUTED };
/// Type resolution function. Given input types, return output type. This /// function MAY may use the kernel state to decide the output type based on /// the FunctionOptions. /// /// This function SHOULD _not_ be used to check for arity, that is to be /// performed one or more layers above. using Resolver = Result<TypeHolder> (*)(KernelContext*, const std::vector<TypeHolder>&);