An async lifetime stupid bug

Async 的代码和 shared_ptr 总是结合紧密的,就像我在 https://github.com/apache/arrow/pull/37713 这里所改的。这里的问题是构造和析构函数的安全性:

  1. 构造的时候把构造过程中的对象乱泄漏是危险的
  2. 异步的函数很难知道析构的时机

比如 CloseAsync() 在我上面的改法:

1
2
3
4
5
6
7
8
9
Future<> CloseAsync() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());

auto self = std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); });
}

这里应该是相对直观的。

我们考虑一个弱智问题,一个类型如果不 enable_shared_from_this,能用上面的方式完成异步的操作吗?我们假设有下列的类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/// CloseWaitGroup is a bit like `WaitGuard`, but it's used with "close".
///
/// When return a Future, we need to guard the lifetime for the underlying data source.
/// CloseWaitGroup is used to guard the lifetime of the underlying data source and
/// provide the graceful close ability.
///
/// Usage:
///
/// Worker Thread:
///
/// ```c++
/// std::shared_ptr<CloseWaitGroup> wg = std::make_shared<CloseWaitGroup>();
///
/// threadPool->submit([wg] {
/// std::optional<CloseWaitGuard> guard = wg->tryGuard();
/// if (!guard) {...}
/// });
/// ```
///
/// Close:
///
/// ```c++
/// wg->closeAndWait();
/// ```
///
/// Note: the code below is illegal and might cause deadlock:
///
/// ```
/// std::optional<CloseWaitGuard> guard = wg->tryGuard();
/// if (!guard) {...}
/// wg->closeAndWait();
/// ```
struct CloseWaitGroup {
// Lifetime of CloseWaitGuard is bounded by the lifetime of CloseWaitGroup.
struct CloseWaitGuard {
explicit CloseWaitGuard(std::unique_lock<std::mutex>& lock, CloseWaitGroup* group) : group_(group) {
// Debug assert lock is held.
assert(lock.owns_lock());
group_->waiters_++;
}
~CloseWaitGuard() {
if (group_ != nullptr) {
std::unique_lock<std::mutex> lock(group_->lock_);
group_->waiters_--;
group_->cond_.notify_all();
}
}
CloseWaitGuard(CloseWaitGuard const&) = delete;
CloseWaitGuard& operator=(CloseWaitGuard const&) = delete;
CloseWaitGuard(CloseWaitGuard&& other) noexcept : group_(other.group_) {
other.group_ = nullptr;
}
CloseWaitGuard& operator=(CloseWaitGuard&&) = delete;

private:
CloseWaitGroup* group_ = nullptr;
};

bool closed() const noexcept {
std::unique_lock<std::mutex> guard(lock_);
return closed_;
}

/// Close the CloseWaitGroup and wait for all the waiters to finish.
///
/// N.B. `closeAndWait` should not be called with the `CloseWaitGuard` held.
void closeAndWait() {
std::unique_lock<std::mutex> guard(this->lock_);
// After `closed_` is set to true, no new waiters will be added.
closed_ = true;
cond_.wait(guard, [this] { return waiters_ == 0; });
}

std::optional<CloseWaitGuard> tryGuard() {
std::unique_lock<std::mutex> guard(lock_);
if (closed_) {
return std::nullopt;
}
return CloseWaitGuard(guard, this);
}

private:
friend struct CloseWaitGroup;
mutable std::mutex lock_;
bool closed_{false};
std::condition_variable cond_;
int32_t waiters_{0};
};

乍看一下确实没啥问题,我们来构造一个弱智的 Case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class A {
public:
explicit A(std::shared_ptr<CloseWaitGroup> wg) : value_(0), wg_(std::move(wg)) {}
explicit A(int v) : value_(v) {}
virtual void func() { std::cout << "A::func() value_: " << value_ << std::endl; }
virtual ~A() { wg_->closeAndWait(); }

protected:
int value_;
std::shared_ptr<CloseWaitGroup> wg_;
};

void fnCall() {
auto wg = std::make_shared<CloseWaitGroup>();
A a(wg);
A *aptr = &a;
std::thread t1([wg, aptr] {
std::optional<CloseWaitGroup::CloseWaitGuard> guard = wg->tryGuard();
if (!guard) {
std::cout << "closed" << std::endl;
return;
}
std::cout << "not closed" << std::endl;
aptr->func();
// sleep 10ms
std::this_thread::sleep_for(std::chrono::milliseconds(40));
std::cout << "Callback start" << std::endl;
aptr->func();
std::cout << "Callback finished" << std::endl;
});
t1.detach();
// sleep 20ms
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}

int main() {
fnCall();
// sleep 2s
std::this_thread::sleep_for(std::chrono::seconds(2));

return 0;
}

这里简单看看:

  1. A 的析构函数里面才调用了 CloseWaitGroupcloseAndWait(),这里不涉及在析构函数调用虚函数
  2. 这里面两次都能成功输出 aptr->func(),那么 ASAN UBSAN 测试也没问题

嗯哼,那真的没问题了吗,我们稍微修改一下 case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class B : public A {
public:
using A::A;
void func() override { std::cout << "B::func() value_: " << value_ << std::endl; }
};

void fnCall() {
auto wg = std::make_shared<CloseWaitGroup>();
B a(wg);
A *aptr = &a;
std::thread t1([wg, aptr] {
std::optional<CloseWaitGroup::CloseWaitGuard> guard = wg->tryGuard();
if (!guard) {
std::cout << "closed" << std::endl;
return;
}
std::cout << "not closed" << std::endl;
aptr->func();
// sleep 10ms
std::this_thread::sleep_for(std::chrono::milliseconds(40));
std::cout << "Callback start" << std::endl;
aptr->func();
std::cout << "Callback finished" << std::endl;
});
t1.detach();
// sleep 20ms
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}

这里输出变成了:

1
2
3
4
B::func() value_: 0
Callback start
A::func() value_: 0
Callback finished

怎么样,第二次输出变成了 A::func()! 我们还能来看看烟花,把 A 的 func() 改成 Pure Virtual Function

1
2
3
4
5
6
7
virtual void func() = 0;

// 输出
not closed
B::func() value_: 0
Callback start
libc++abi: Pure virtual function called!

这里实际上:

  1. B 的析构已经完成了
  2. A 析构的时候,阻塞
  3. 另一个线程调用 ::func, 虚表就这么快乐的飞了!

反思一下,这个问题本身非常简单,算是 shared_ptr 用了就不会存在的问题,虽然很快改改代码修好了,不过感叹一下还是挺奇特的。