Go Concurrency Patterns
Pipeline
Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines
- receive values from upstream via inbound channels
- perform some function on that data, usually producing new values
- send values downstream via outbound channels
你可能对 OS 里面的 pipeline 相当熟悉,每天都用。实际上甚至在 6.828 2019 作业里面还实现了一个类似的模式实现 prime: https://pdos.csail.mit.edu/6.828/2019/labs/util.html
这个 pipeline 意思是通过 fn(in chan) out chan
这样的模式不停套 channel 链,最后提供给一个消费者:
1 | chan1 := make(chan int) |
这样甚至能很函数式(bushi)的套圈:
1 | func main() { |
Fan-in/fan-out
Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.
chan 其实类似一个很诡异(niubi)的 mpms 的 queue。上述的 sq
, gen
都是:
- 接受一个/零个 in channel
- apply 一系列操作
- 吐出一个 out channel
博客中提供了一个 merge
函数的例子:
1 | func main() { |
这个函数同时做了 fan-in 和 fan-out:
1 | func merge(cs ...<-chan int) <-chan int { |
注意到我们前一篇博客提到的一些问题,比如 errors 的处理和 goroutine 泄漏:
There is a pattern to our pipeline functions:
- stages close their outbound channels when all the send operations are done.
- stages keep receiving values from inbound channels until those channels are closed.
This pattern allows each receiving stage to be written as a
range
loop and ensures that all goroutines exit once all values have been successfully sent downstream.But in real pipelines, stages don’t always receive all the inbound values. Sometimes this is by design: the receiver may only need a subset of values to make progress. More often, a stage exits early because an inbound value represents an error in an earlier stage. In either case the receiver should not have to wait for the remaining values to arrive, and we want earlier stages to stop producing values that later stages don’t need.
1 | // Consume the first value from the output. |
对于这样的消费者,生产的 goroutine 会卡住 —> 泄漏。一定程度上,可以用 buffer + close 解决这个问题。
1 | func gen(nums ...int) <-chan int { |
插入一段 goroutine 知识
我们的 channel 和 close 有什么规则呢:
- channel 是 FIFO 的
- 如果去 close, Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接
panic
并抛出异常 - close 的时候不能是
<-chan
, 即单向接受的 channel - Channel 和 goroutine 的 GC:一边有数据就不能 gc.
具体参考 Detailed Explanations for Channel Operations 一节,可以看到:
To make the explanations for channel operations simple and clear, in the remaining of this article, channels will be classified into three categories:
- nil channels.
- non-nil but closed channels.
- not-closed non-nil channels.
The following table simply summarizes the behaviors for all kinds of operations applying on nil, closed and not-closed non-nil channels.
Operation A Nil Channel A Closed Channel A Not-Closed Non-Nil Channel Close panic panic succeed to close (C) Send Value To block for ever panic block or succeed to send (B) Receive Value From block for ever never block (D) block or succeed to receive (A)
其实我挺好奇的, Rust 区分了 sender receiver, 这里为啥不…
Explicit cancellation
When
main
decides to exit without receiving all the values fromout
, it must tell the goroutines in the upstream stages to abandon the values they’re trying to send. It does so by sending values on a channel calleddone
. It sends two values since there are potentially two blocked senders:
具体来说,下游已经拿到了数据或者出现了 err, 需要告诉上游这样的消息。
1 | func main() { |
加入了 done 这个 flag 后,这里只需要一个值,同时有两个 input channel. 那么,merge 我们可以实现:
1 | func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { |