/// CloseWaitGroup is a bit like `WaitGuard`, but it's used with "close". /// /// When return a Future, we need to guard the lifetime for the underlying data source. /// CloseWaitGroup is used to guard the lifetime of the underlying data source and /// provide the graceful close ability. /// /// Usage: /// /// Worker Thread: /// /// ```c++ /// std::shared_ptr<CloseWaitGroup> wg = std::make_shared<CloseWaitGroup>(); /// /// threadPool->submit([wg] { /// std::optional<CloseWaitGuard> guard = wg->tryGuard(); /// if (!guard) {...} /// }); /// ``` /// /// Close: /// /// ```c++ /// wg->closeAndWait(); /// ``` /// /// Note: the code below is illegal and might cause deadlock: /// /// ``` /// std::optional<CloseWaitGuard> guard = wg->tryGuard(); /// if (!guard) {...} /// wg->closeAndWait(); /// ``` structCloseWaitGroup { // Lifetime of CloseWaitGuard is bounded by the lifetime of CloseWaitGroup. structCloseWaitGuard { explicitCloseWaitGuard(std::unique_lock<std::mutex>& lock, CloseWaitGroup* group) : group_(group) { // Debug assert lock is held. assert(lock.owns_lock()); group_->waiters_++; } ~CloseWaitGuard() { if (group_ != nullptr) { std::unique_lock<std::mutex> lock(group_->lock_); group_->waiters_--; group_->cond_.notify_all(); } } CloseWaitGuard(CloseWaitGuard const&) = delete; CloseWaitGuard& operator=(CloseWaitGuard const&) = delete; CloseWaitGuard(CloseWaitGuard&& other) noexcept : group_(other.group_) { other.group_ = nullptr; } CloseWaitGuard& operator=(CloseWaitGuard&&) = delete;
/// Close the CloseWaitGroup and wait for all the waiters to finish. /// /// N.B. `closeAndWait` should not be called with the `CloseWaitGuard` held. voidcloseAndWait(){ std::unique_lock<std::mutex> guard(this->lock_); // After `closed_` is set to true, no new waiters will be added. closed_ = true; cond_.wait(guard, [this] { return waiters_ == 0; }); }