/// \brief An extensible registry for factories of ExecNodes classARROW_ACERO_EXPORT ExecFactoryRegistry { public: using Factory = std::function<Result<ExecNode*>(ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>;
virtual ~ExecFactoryRegistry() = default;
/// \brief Get the named factory from this registry /// /// will raise if factory_name is not found virtual Result<Factory> GetFactory(const std::string& factory_name)= 0;
/// \brief Add a factory to this registry with the provided name /// /// will raise if factory_name is already in the registry virtual Status AddFactory(std::string factory_name, Factory factory)= 0; };
/// The default registry, which includes built-in factories. ARROW_ACERO_EXPORT ExecFactoryRegistry* default_exec_factory_registry();
/// \brief Construct an ExecNode using the named factory inline Result<ExecNode*> MakeExecNode( const std::string& factory_name, ExecPlan* plan, std::vector<ExecNode*> inputs, const ExecNodeOptions& options, ExecFactoryRegistry* registry = default_exec_factory_registry()){ ARROW_ASSIGN_OR_RAISE(auto factory, registry->GetFactory(factory_name)); returnfactory(plan, std::move(inputs), options); }
using FragmentGenerator = AsyncGenerator<std::shared_ptr<Fragment>>;
Result<FragmentGenerator> AsyncScanner::GetFragments()const{ // TODO(ARROW-8163): Async fragment scanning will return AsyncGenerator<Fragment> // here. Current iterator based versions are all fast & sync so we will just ToVector // it ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset_->GetFragments(scan_options_->filter)); ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector()); returnMakeVectorGenerator(std::move(fragments_vec)); }
ARROW_ASSIGN_OR_RAISE(auto plan, acero::ExecPlan::Make(exec_context)); // Drop projection since we only need to count rows constauto options = std::make_shared<ScanOptions>(*scan_options_); ARROW_ASSIGN_OR_RAISE(auto empty_projection, ProjectionDescr::FromNames(std::vector<std::string>(), *scan_options_->dataset_schema)); SetProjection(options.get(), empty_projection);
auto total = std::make_shared<std::atomic<int64_t>>(0);
classARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { public: // This allows operators to rely on signed 16-bit indices staticconstuint32_t kMaxBatchSize = 1 << 15; using NodeVector = std::vector<ExecNode*>;
virtual ~ExecPlan() = default;
QueryContext* query_context();
/// \brief retrieve the nodes in the plan const NodeVector& nodes()const;
template <typename Node, typename... Args> Node* EmplaceNode(Args&&... args){ std::unique_ptr<Node> node{new Node{std::forward<Args>(args)...}}; auto out = node.get(); AddNode(std::move(node)); return out; }
Status Validate();
/// \brief Start producing on all nodes /// /// Nodes are started in reverse topological order, such that any node /// is started before all of its inputs. voidStartProducing();
/// \brief Stop producing on all nodes /// /// Triggers all sources to stop producing new data. In order to cleanly stop the plan /// will continue to run any tasks that are already in progress. The caller should /// still wait for `finished` to complete before destroying the plan. voidStopProducing();
/// \brief A future which will be marked finished when all tasks have finished. Future<> finished();
/// \brief Return whether the plan has non-empty metadata boolHasMetadata()const;
/// \brief Return the plan's attached metadata std::shared_ptr<const KeyValueMetadata> metadata()const;
classARROW_ACERO_EXPORT ExecNode { public: using NodeVector = std::vector<ExecNode*>;
virtual ~ExecNode() = default;
virtualconstchar* kind_name()const= 0;
// The number of inputs expected by this node intnum_inputs()const{ returnstatic_cast<int>(inputs_.size()); }
/// This node's predecessors in the exec plan const NodeVector& inputs()const{ return inputs_; }
/// True if the plan has no output schema (is a sink) boolis_sink()const{ return !output_schema_; }
/// \brief Labels identifying the function of each input. const std::vector<std::string>& input_labels()const{ return input_labels_; }
/// This node's successor in the exec plan const ExecNode* output()const{ return output_; }
/// The datatypes for batches produced by this node const std::shared_ptr<Schema>& output_schema()const{ return output_schema_; }
/// This node's exec plan ExecPlan* plan(){ return plan_; }
/// \brief An optional label, for display and debugging /// /// There is no guarantee that this value is non-empty or unique. const std::string& label()const{ return label_; } voidSetLabel(std::string label){ label_ = std::move(label); }
virtual Status Validate()const;
/// \brief the ordering of the output batches virtualconst Ordering& ordering()const;
/// Transfer input batch to ExecNode /// /// A node will typically perform some kind of operation on the batch /// and then call InputReceived on its outputs with the result. /// /// Other nodes may need to accumulate some number of inputs before any /// output can be produced. These nodes will add the batch to some kind /// of in-memory accumulation queue and return. virtual Status InputReceived(ExecNode* input, ExecBatch batch)= 0;
/// Mark the inputs finished after the given number of batches. /// /// This may be called before all inputs are received. This simply fixes /// the total number of incoming batches for an input, so that the ExecNode /// knows when it has received all input, regardless of order. virtual Status InputFinished(ExecNode* input, int total_batches)= 0;
/// \brief Perform any needed initialization /// /// This hook performs any actions in between creation of ExecPlan and the call to /// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes /// that executes this method is undefined, but the calls are made synchronously. /// /// At this point a node can rely on all inputs & outputs (and the input schemas) /// being well defined. virtual Status Init();
/// \brief Start producing /// /// This must only be called once. /// /// This is typically called automatically by ExecPlan::StartProducing(). virtual Status StartProducing()= 0;
/// \brief Pause producing temporarily /// /// \param output Pointer to the output that is full /// \param counter Counter used to sequence calls to pause/resume /// /// This call is a hint that an output node is currently not willing /// to receive data. /// /// This may be called any number of times. /// However, the node is still free to produce data (which may be difficult /// to prevent anyway if data is produced using multiple threads). virtualvoidPauseProducing(ExecNode* output, int32_t counter)= 0;
/// \brief Resume producing after a temporary pause /// /// \param output Pointer to the output that is now free /// \param counter Counter used to sequence calls to pause/resume /// /// This call is a hint that an output node is willing to receive data again. /// /// This may be called any number of times. virtualvoidResumeProducing(ExecNode* output, int32_t counter)= 0;
/// \brief Stop producing new data /// /// If this node is a source then the source should stop generating data /// as quickly as possible. If this node is not a source then there is typically /// nothing that needs to be done although a node may choose to start ignoring incoming /// data. /// /// This method will be called when an error occurs in the plan /// This method may also be called by the user if they wish to end a plan early /// Finally, this method may be called if a node determines it no longer needs any more /// input (for example, a limit node). /// /// This method may be called multiple times. /// /// This is not a pause. There will be no way to start the source again after this has /// been called. virtual Status StopProducing();
/// Construct a scheduler /// /// \param initial_task The initial task which is responsible for adding /// the first subtasks to the scheduler. /// \param abort_callback A callback that will be triggered immediately after a task /// fails while other tasks may still be running. Nothing needs to be done here, /// when a task fails the scheduler will stop accepting new tasks and eventually /// return the error. However, this callback can be used to more quickly end /// long running tasks that have already been submitted. Defaults to doing /// nothing. /// \param stop_token An optional stop token that will allow cancellation of the /// scheduler. This will be checked before each task is submitted and, in the /// event of a cancellation, the scheduler will enter an aborted state. This is /// a graceful cancellation and submitted tasks will still complete. /// \return A future that will be completed when the initial task and all subtasks have /// finished. static Future<> Make( FnOnce<Status(AsyncTaskScheduler*)> initial_task, FnOnce<void(const Status&)> abort_callback = [](const Status&) {}, StopToken stop_token = StopToken::Unstoppable());
Future<> AddScanTasks(const std::shared_ptr<FragmentScanner>& fragment_scanner) { scan_state->fragment_scanner = fragment_scanner; ScanState* state_view = scan_state.get(); Future<> list_and_scan_done = Future<>::Make(); // Finish callback keeps the scan state alive until all scan tasks done structStateHolder { Status operator()(){ list_and_scan_done.MarkFinished(); return Status::OK(); } Future<> list_and_scan_done; std::unique_ptr<ScanState> scan_state; };
std::unique_ptr<util::AsyncTaskGroup> scan_tasks = util::AsyncTaskGroup::Make( node->batches_throttle_.get(), StateHolder{list_and_scan_done, std::move(scan_state)}); for (int i = 0; i < fragment_scanner->NumBatches(); i++) { node->num_batches_.fetch_add(1); scan_tasks->AddTask(std::make_unique<ScanBatchTask>(node, state_view, i)); } // The "list fragments" task doesn't actually end until the fragments are // all scanned. This allows us to enforce fragment readahead. return list_and_scan_done; }