How to exit task

开启一个线程不简单,关闭一个线程麻烦事更多. C++ 11 提供了 std::thread 的抽象,C++20 甚至提供了 coroutine. 我们的背后可能是 POSIX thread 在内的各种平台的线程。同时我们也有任务式的线程 api, 例如 future promise 对应的接口。在 Go 语言中,我们甚至能以一种更便捷的方式,以语法支持的形式创建 goroutine, 并以 CSP 的模式来通信。而背后的原生线程是隐匿的。

对于 C++11 <thread> 而言,线程抽象与 std::thread 是绑定的. 最好的阅读地点在:

https://en.cppreference.com/w/cpp/thread/thread

delays), starting at the top-level function provided as a constructor argument. The return value of the top-level function is ignored and if it terminates by throwing an exception, std::terminate is called. The top-level function may communicate its return value or an exception to the caller via std::promise or by modifying shared variables (which may require synchronization, see std::mutex and std::atomic)

std::thread objects may also be in the state that does not represent any thread (after default construction, move from, detach, or join), and a thread of execution may be not associated with any thread objects (after detach).

No two std::thread objects may represent the same thread of execution; std::thread is not CopyConstructible or CopyAssignable, although it is MoveConstructible and MoveAssignable.

其中,我们也要关注线程的析构:https://en.cppreference.com/w/cpp/thread/thread/~thread

在析构的时候,如果线程是 joinable 的,这里会调用 std::terminate: https://zh.cppreference.com/w/cpp/error/terminate

当然,你可以用任务式的 api, 例如 https://en.cppreference.com/w/cpp/thread/async , 让程序自己决定什么时候启动线程,指定对应的执行策略。

阅读上面一段话,你会发现一点:joinablethreadterminate. 同时你可能会注意到,C++20 中,提供了 jthread https://en.cppreference.com/w/cpp/thread/jthread . 关于使用,其实可以参考 CppCoreGuildlines: https://www.modernescpp.com/index.php/c-core-guidelines-taking-care-of-your-child-thread

对,你会开始意识到,开启一个线程或许有一些小坑,但是从线程退出其实更麻烦,你在一个 noexcept 的地方抛出 std::terminate, 那你就等死吧。

退出的目的

你可能需要:

  1. anyOf,计算出最快的一个任务
  2. 我不再需要这个任务了

那么我们可能想到两个方案:

  1. 假装自己是鸵鸟,不管之前别的 thread
  2. 全部 kill/terminate 掉

第一个在 C++ 可能会有 lifetime 问题哟:

1
2
3
4
5
6
7
8
9
{
Resource ...
for i = 0; i < 10; i++ {
Compute
}
if (ok) {
return
}
}

那么你会开始头疼了,同样,假设甚至存在引用计数的时候,对,假设在 Go 语言,你还会面对 Goroutine 泄漏 的问题,你也要开一个 Goroutine, 接受任务完成 flag, 回收可能的资源。

第二个是 terminate, 或者甚至拿到 native 的 posix api, 来 terminate 它,但是下面问题来了:

  1. 还是资源,资源泄漏
  2. __force_unwind 呢?noexcept 呢?全炸了?

这里隐含这这么一点:你无法无痛的终止线程

cancel: 你该结束了

现在我们绕开线程,回到线程池,它是怎么终止线程的呢?

假设我们在一个很大的线程池子里,那么 Pool 要 destruct 了,这是一个很常见的场景。我们看看某些实现,可能会发现

1
2
3
4
5
6
7
8
9
10
11
12
atomic<bool> stop_flag;

cancel() {
stop_flag = true;
}

loop() {
while (stop_flag.load()) {
..
}
// handle exit
}

stop_flag 是一种很常见的模式,甚至也可以加上一个共享的池子上的 notifywait 的 cv. 然后cv 自己有 barrier 语义,不过大意都是一样的:你在线程调用的地方提供 cancel flag。当然我们也能够提供 Poison 这样的消息,来保持这个层面的 stop 语义。

Blocking?

上面语义是很清晰的,但是如果碰到 IO 呢?又来了:

1
2
3
4
5
6
7
8
try
{
do_something();
}
catch(thread_interrupted&)
{
handle_interruption();
}

这里我们又回来了,手动 interrupt,RAII…因为本质上,blocking 这还是个问题。

没有无痛的方式:可 cancel 的任务

我们看到了线程控制的困难性,同时,我们也能意识到,语法上 Go 不暴露 thread 本身的接口,所以它会用 context 模式来传递信息:

https://zhuanlan.zhihu.com/p/107930946

而C#也提供了类似的接口,要把 cancel 的信息传下去

这意味着,我们某种意义上,是要对任务进行侵入的,我们可以参考 C++ Concurrency In Action 第二版 ,这里对相关的编程内容进行了入侵。

隔离:或许我需要进程

回到之前的问题,我们需要停止,可能某种意义上不止需要“并发执行任务”,还需要“入侵执行任务的逻辑”。这里,我们可以突然想到,一直以来这篇文章都在纠结线程,但是进程由 OS 提供了更好的隔离和通信机制。加入我们引入了某个库,有一个 ComputeHeavy 函数,这个函数会调用很重的计算,那么我们可能不太好侵入。我们可以使用 condition_variable_any 加上 future , 但同时,我们也可以考虑用进程,它提供的是更好的隔离、资源回收等机制。

参考

  1. C++ Concurrency In Action 第二版
  2. https://stackoverflow.com/questions/10834469/terminating-blocking-io-in-linux-c
  3. https://zhuanlan.zhihu.com/p/143564479