voidQueryContext::ScheduleTask(std::function<Status()> fn, std::string_view name){ ::arrow::internal::Executor* exec = executor(); // Adds a task which submits fn to the executor and tracks its progress. If we're // already stopping then the task is ignored and fn is not executed. async_scheduler_->AddSimpleTask( [exec, fn = std::move(fn)]() mutable { return exec->Submit(std::move(fn)); }, name); }
using rev_it = std::reverse_iterator<NodeVector::iterator>; for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != end; ++it) { auto node = *it;
st = node->StartProducing(); if (!st.ok()) { // Stop nodes that successfully started, in reverse order bool expected = false; if (stopped_.compare_exchange_strong(expected, true)) { StopProducingImpl(it.base(), sorted_nodes_.end()); } return st; } } return st; }, [this](const Status& st) { // If an error occurs we call StopProducing. The scheduler will already have // stopped scheduling new tasks at this point. However, any nodes that are // dealing with external tasks will need to trigger those external tasks to end // early. StopProducing(); }); scheduler_finished.AddCallback([this](const Status& st) { if (st.ok()) { if (stopped_.load()) { finished_.MarkFinished(Status::Cancelled("Plan was cancelled early.")); } else { finished_.MarkFinished(); } } else { finished_.MarkFinished(st); } });
Result<Future<>> operator()() { Future<> task = Future<>::Make(); // Consume as many items as we can (those that are already finished) // synchronously to avoid recursion / stack overflow. while (true) { Future<T> next = state_holder->generator(); if (next.TryAddCallback( [&] { returnSubmitTaskCallback(std::move(state_holder), task); })) { return task; } ARROW_ASSIGN_OR_RAISE(T item, next.result()); if (IsIterationEnd(item)) { task.MarkFinished(); return task; } ARROW_RETURN_NOT_OK(state_holder->visitor(item)); } }
/// \brief Consumer API: Register a callback to run when this future completes /// /// The callback should receive the result of the future (const Result<T>&) /// For a void or statusy future this should be (const Status&) /// /// There is no guarantee to the order in which callbacks will run. In /// particular, callbacks added while the future is being marked complete /// may be executed immediately, ahead of, or even the same time as, other /// callbacks that have been previously added. /// /// WARNING: callbacks may hold arbitrary references, including cyclic references. /// Since callbacks will only be destroyed after they are invoked, this can lead to /// memory leaks if a Future is never marked finished (abandoned): /// /// { /// auto fut = Future<>::Make(); /// fut.AddCallback([fut]() {}); /// } /// /// In this example `fut` falls out of scope but is not destroyed because it holds a /// cyclic reference to itself through the callback. template <typename OnComplete, typename Callback = WrapOnComplete<OnComplete>> voidAddCallback(OnComplete on_complete, CallbackOptions opts = CallbackOptions::Defaults()) const { // We know impl_ will not be dangling when invoking callbacks because at least one // thread will be waiting for MarkFinished to return. Thus it's safe to keep a // weak reference to impl_ here impl_->AddCallback(Callback{std::move(on_complete)}, opts); }
/// \brief Overload of AddCallback that will return false instead of running /// synchronously /// /// This overload will guarantee the callback is never run synchronously. If the future /// is already finished then it will simply return false. This can be useful to avoid /// stack overflow in a situation where you have recursive Futures. For an example /// see the Loop function /// /// Takes in a callback factory function to allow moving callbacks (the factory function /// will only be called if the callback can successfully be added) /// /// Returns true if a callback was actually added and false if the callback failed /// to add because the future was marked complete. template <typename CallbackFactory, typename OnComplete = detail::result_of_t<CallbackFactory()>, typename Callback = WrapOnComplete<OnComplete>> boolTryAddCallback(CallbackFactory callback_factory, CallbackOptions opts = CallbackOptions::Defaults()) const { return impl_->TryAddCallback([&]() { return Callback{callback_factory()}; }, opts); }
boolAddTask(std::unique_ptr<Task> task)override{ std::unique_lock lk(mutex_); // If the queue isn't empty then don't even try and acquire the throttle // We can safely assume it is either blocked or in the middle of trying to // alert a queued task. if (!queue_->Empty()) { queue_->Push(std::move(task)); returntrue; } int latched_cost = std::min(task->cost(), throttle_->Capacity()); std::optional<Future<>> maybe_backoff = throttle_->TryAcquire(latched_cost); if (maybe_backoff) { queue_->Push(std::move(task)); lk.unlock(); maybe_backoff->AddCallback( [weak_self = std::weak_ptr<ThrottledAsyncTaskSchedulerImpl>( shared_from_this())](const Status& st) { if (st.ok()) { if (auto self = weak_self.lock()) { self->ContinueTasks(); } } }); returntrue; } else { lk.unlock(); returnSubmitTask(std::move(task), latched_cost, /*in_continue=*/false); } }
boolSubmitTask(std::unique_ptr<Task> task, int latched_cost, bool in_continue){ // Wrap the task with a wrapper that runs it and then checks to see if there are any // queued tasks std::string_view name = task->name(); return target_->AddSimpleTask( [latched_cost, in_continue, inner_task = std::move(task), self = shared_from_this()]() mutable -> Result<Future<>> { ARROW_ASSIGN_OR_RAISE(Future<> inner_fut, (*inner_task)()); if (!inner_fut.TryAddCallback([&] { return [latched_cost, self = std::move(self)](const Status& st) -> void { if (st.ok()) { self->throttle_->Release(latched_cost); self->ContinueTasks(); } }; })) { // If the task is already finished then don't run ContinueTasks // if we are already running it so we can avoid stack overflow self->throttle_->Release(latched_cost); if (!in_continue) { self->ContinueTasks(); } } return inner_fut; }, name); }
voidContinueTasks(){ std::unique_lock lk(mutex_); while (!queue_->Empty()) { int next_cost = std::min(queue_->Peek().cost(), throttle_->Capacity()); std::optional<Future<>> maybe_backoff = throttle_->TryAcquire(next_cost); if (maybe_backoff) { lk.unlock(); if (!maybe_backoff->TryAddCallback([&] { return [self = shared_from_this()](const Status& st) { if (st.ok()) { self->ContinueTasks(); } }; })) { if (!maybe_backoff->status().ok()) { return; } lk.lock(); continue; } return; } else { std::unique_ptr<Task> next_task = queue_->Pop(); lk.unlock(); if (!SubmitTask(std::move(next_task), next_cost, /*in_continue=*/true)) { return; } lk.lock(); } } }
~AsyncTaskGroupImpl() { if (--state_->task_count == 0) { Status st = std::move(state_->finish_cb)(); if (!st.ok()) { // We can't return an invalid status from the destructor so we schedule a dummy // failing task target_->AddSimpleTask([st = std::move(st)]() { return st; }, "failed_task_reporter"sv); } } }
// Used for asynchronous execution of operations that can be broken into // a fixed number of symmetric tasks that can be executed concurrently. // // Implements priorities between multiple such operations, called task groups. // // Allows to specify the maximum number of in-flight tasks at any moment. // // Also allows for executing next pending tasks immediately using a caller thread. // classARROW_ACERO_EXPORT TaskScheduler { public: using TaskImpl = std::function<Status(size_t, int64_t)>; using TaskGroupContinuationImpl = std::function<Status(size_t)>; using ScheduleImpl = std::function<Status(TaskGroupContinuationImpl)>; using AbortContinuationImpl = std::function<void()>;
virtual ~TaskScheduler() = default; // Order in which task groups are registered represents priorities of their tasks // (the first group has the highest priority). // // Returns task group identifier that is used to request operations on the task group. virtualintRegisterTaskGroup(TaskImpl task_impl, TaskGroupContinuationImpl cont_impl)= 0; virtualvoidRegisterEnd()= 0; // total_num_tasks may be zero, in which case task group continuation will be executed // immediately virtual Status StartTaskGroup(size_t thread_id, int group_id, int64_t total_num_tasks)= 0; // Execute given number of tasks immediately using caller thread virtual Status ExecuteMore(size_t thread_id, int num_tasks_to_execute, bool execute_all)= 0; // Begin scheduling tasks using provided callback and // the limit on the number of in-flight tasks at any moment. // // Scheduling will continue as long as there are waiting tasks. // // It will automatically resume whenever new task group gets started. virtual Status StartScheduling(size_t thread_id, ScheduleImpl schedule_impl, int num_concurrent_tasks, bool use_sync_execution)= 0; // Abort scheduling and execution. // Used in case of being notified about unrecoverable error for the entire query. virtualvoidAbort(AbortContinuationImpl impl)= 0;
// Task group state transitions progress one way. // Seeing an old version of the state by a thread is a valid situation. // enum classTaskGroupState : int { NOT_READY, READY, ALL_TASKS_STARTED, ALL_TASKS_FINISHED };
// 所有的 TaskGroup std::vector<TaskGroup> task_groups_; bool register_finished_; std::mutex mutex_; // Mutex protecting task_groups_ (state_ and num_tasks_present_ // fields) and register_finished_ flag
AtomicWithPadding<bool> aborted_; // 最大为 `num_concurrent_tasks_`,表示最大可用的剩余并发数 AtomicWithPadding<int> num_tasks_to_schedule_; // If a task group adds tasks it's possible for a thread inside // ScheduleMore to miss this fact. This serves as a flag to // notify the scheduling thread that it might need to make // another pass through the scheduler // // 在数次调用之中,是否有新的 StartTaskGroup 插入了, 用来防止丢失调度机会 AtomicWithPadding<bool> tasks_added_recently_; };