Go Concurrency Patterns

Pipeline

Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines

  • receive values from upstream via inbound channels
  • perform some function on that data, usually producing new values
  • send values downstream via outbound channels

你可能对 OS 里面的 pipeline 相当熟悉,每天都用。实际上甚至在 6.828 2019 作业里面还实现了一个类似的模式实现 prime: https://pdos.csail.mit.edu/6.828/2019/labs/util.html

这个 pipeline 意思是通过 fn(in chan) out chan 这样的模式不停套 channel 链,最后提供给一个消费者:

1
2
3
4
5
6
7
8
9
10
11
chan1 := make(chan int)
go func() {
for i := 2; i < 100; i++ {
chan1 <- i
}
close(chan1)
}()
primeChan := Filter(chan1, IsPrime)
for prime := range primeChan {
fmt.Println("Recv prime: ", prime)
}

这样甚至能很函数式(bushi)的套圈:

1
2
3
4
5
6
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}

Fan-in/fan-out

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.

A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.

chan 其实类似一个很诡异(niubi)的 mpms 的 queue。上述的 sq , gen 都是:

  • 接受一个/零个 in channel
  • apply 一系列操作
  • 吐出一个 out channel

博客中提供了一个 merge 函数的例子:

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}

这个函数同时做了 fan-in 和 fan-out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

注意到我们前一篇博客提到的一些问题,比如 errors 的处理和 goroutine 泄漏:

There is a pattern to our pipeline functions:

  • stages close their outbound channels when all the send operations are done.
  • stages keep receiving values from inbound channels until those channels are closed.

This pattern allows each receiving stage to be written as a range loop and ensures that all goroutines exit once all values have been successfully sent downstream.

But in real pipelines, stages don’t always receive all the inbound values. Sometimes this is by design: the receiver may only need a subset of values to make progress. More often, a stage exits early because an inbound value represents an error in an earlier stage. In either case the receiver should not have to wait for the remaining values to arrive, and we want earlier stages to stop producing values that later stages don’t need.

1
2
3
4
5
6
7
    // Consume the first value from the output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}

对于这样的消费者,生产的 goroutine 会卡住 —> 泄漏。一定程度上,可以用 buffer + close 解决这个问题。

1
2
3
4
5
6
7
8
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}

插入一段 goroutine 知识

我们的 channel 和 close 有什么规则呢:

  • channel 是 FIFO 的
  • 如果去 close, Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接 panic 并抛出异常
  • close 的时候不能是<-chan, 即单向接受的 channel
  • Channel 和 goroutine 的 GC:一边有数据就不能 gc.

具体参考 Detailed Explanations for Channel Operations 一节,可以看到:

To make the explanations for channel operations simple and clear, in the remaining of this article, channels will be classified into three categories:

  1. nil channels.
  2. non-nil but closed channels.
  3. not-closed non-nil channels.

The following table simply summarizes the behaviors for all kinds of operations applying on nil, closed and not-closed non-nil channels.

Operation A Nil Channel A Closed Channel A Not-Closed Non-Nil Channel
Close panic panic succeed to close (C)
Send Value To block for ever panic block or succeed to send (B)
Receive Value From block for ever never block (D) block or succeed to receive (A)

其实我挺好奇的, Rust 区分了 sender receiver, 这里为啥不…

Explicit cancellation

When main decides to exit without receiving all the values from out, it must tell the goroutines in the upstream stages to abandon the values they’re trying to send. It does so by sending values on a channel called done. It sends two values since there are potentially two blocked senders:

具体来说,下游已经拿到了数据或者出现了 err, 需要告诉上游这样的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

// Consume the first value from output.
done := make(chan struct{})
defer close(done)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
}

加入了 done 这个 flag 后,这里只需要一个值,同时有两个 input channel. 那么,merge 我们可以实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}

}

wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}