译 | 如何优雅地关闭 Go 中的工作 goroutine

原文:Go - graceful shutdown of worker goroutines


在这篇博文中,我们将看看 Go 程序的优雅关闭。这类 Go 程序有一些执行任务的工作 goroutine,要求在程序关闭之前,这些工作 goroutine 必须完成任务。

介绍

在一个最近的项目中,我们有一个使用场景:一个基于 Go 的微服务不断地消费另一个第三方库发出的事件。这些事件在调用外部服务之前,会进行一些处理。而外部服务处理每个请求的速度都相当慢,但另一方面,它能够处理许多并发请求。因此,我们实现了一个简单的 worker 池,将输入事件扇出为几个并发执行的 goroutine。

总的来说,它看起来像这样:

figure 1

然而,我们需要保证在该微服务关闭的时候,当前任何正在运行的对外部服务的请求必须完成,并且请求结果在我们的内部后端持久化。

worker 池和终止信号处理

worker 池模式是一个有名的关于 worker 池的 Go 模式。此外,还有大量关于如何进行基于 SIGTERM 通知的优雅关闭的例子。但我们意识到,我们的一些需求使得使用场景有点更复杂。

当程序接收到 SIGTERM 或者 SIGINT 信号(例如,因容器编排器缩容到一定数目的副本数而产生的)时,在终止整个程序之前,必须允许当前任何工作中的 worker goroutine 完成它们长期运行的工作。

让事情稍微复杂些的是,我们对生产者端的库没有任何控制权。一开始我们会注册一个回调函数,每当生产端的库有了一个(我们需要的)事件,就会调用这个回调函数。该库会处于阻塞状态,直到回调函数结束执行。然后,当有更多事件产生时,库会再次调用这个函数。

worker-pool 的诸多 goroutine 通过使用标准的“对 channel 进行 range 操作”结构,来不断处理事件,例如:

1
2
3
4
5
func workerFunc() {
for event := range jobsChan { // 阻塞直到接收到一个事件,或者该 channel 被关闭。
// handle the event...
}
}

这意味着,让一个 worker “结束”最干净的方式是关闭名为 “jobsChan” 的 channel。

在生产者端进行关闭

你首先学到的关于在 Go 中关闭 channel 的第一件事情之一是,如果向已关闭的 channel 发送数据,程序就会 panic。这归结于一个非常简单的规则:

“总是在生产者端关闭一个 channel(Always close a channel on the producer side)”

不管怎样,什么是生产者端呢?嗯,一般是那个将事件发送到 channel 里的 goroutine

1
2
3
func callbackFunc(event int) {
jobsChan<-event
}

上面是我们的回调函数 callbackFunc,我们将其注册到外部库中,外部库就会将事件传给我们。(为了让这些例子简单些,我将真实的事件替换为一个简单的整形,以作为负载。)

你要如何 安全地 保护上面的代码免于给已关闭的 channel 发送数据呢?一路沿着 Mutex、布尔型标志和 if 语句以确定是否一些_其他_ goroutine 关闭了 channel,以及控制是否应该允许发送数据,这并不简单。多留心潜在的竞争条件和不确定行为。

我们的解决方法是引入一个中间 channel 和一个内部的“消费者”,后者作为回调和任务 channel 之间的代理:

figure 2

消费者函数看起来像这样:

1
2
3
4
5
6
7
8
9
10
11
12
func startConsumer(ctx context.Context) {
// Loop until a ctx.Done() is received. Note that select{} blocks until either case happens
for {
select {
case event := <-intermediateChan:
jobsChan <- event
case _ <- ctx.Done():
close(jobsChan)
return // exit this function so we don't consume anything more from the intermediate chan
}
}
}

好了,等下。这个 “select” 和 “ctx.Done()” 是啥?

恕我直言,select 语句是 Go 最神奇的东西之一。它允许多个 channel 的等待和协同。在这种情况下,我们或者会从中间 channel 那里收到事件,然后将其传到 jobsChan,又或者会从 context.Context 接收到取消信号。

关闭 jobsChan 之后的 return 语句将让我们离开 for 循环和函数,这确保了 不会有新事件被传递给 jobsChan,并且不会从 intermediateChan 消费到 任何事件

所以,要么是传递事件到 jobsChan(worker 从这里消费),要么在作为生产者的 同一个 goroutine 中 关闭 jobsChan。

关闭 jobsChan 意味着消费端的所有 worker 将会停止遍历 jobsChan:

1
2
3
for event := range jobsChan { // <- on the close(jobsChan), all goroutines waiting for jobs here will exit the for-loop
// handle the event...
}

发出取消信号

等待 Go 程序退出是一种有名的模式:

1
2
3
4
5
6
7
8
func main() {
... rest of program ...

termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan // Blocks here until either SIGINT or SIGTERM is received.
// 接下来呢?
}

在 “接下来呢?” 这一部分,捕获到 SIGINT 或者 SIGTERM 后,主 goroutine 恢复执行。我们需要告诉将事件从 intermediateChan 传到 jobsChan 的消费者,跨 goroutine 边界关闭 jobsChan。

再次,使用 Mutex 和条件语句来解决这个问题,技术上是可行的,但是相当难搞并且容易出错。作为替代,我们会利用前面提及的 context.Context 的取消支持。

func main() 的某个地方,我们设置了一个带取消支持的根 background context:

1
2
3
4
5
6
7
8
9
10
11
func main() {
ctx, cancelFunc := context.WithCancel(ctx.Background())
// ... some omitted code ...

go startConsumer(ctx) // pass the cancellable context to the consumer function

// ... some more omitted code ...
<-termChan

cancelFunc() // call the cancelfunc to notify the consumer it's time to shut stuff down.
}

这就是 < -ctx.Done() 这一 select case 如何被调用的,它开始优雅拆卸 channel 和 worker。

使用 WaitGroup

上面这个方法只有一个问题:调用 cancelFunc() 后,程序会立即退出,这意味着,正在动态调用中的工作 goroutine 将没有时间执行完毕,这使得我们系统中的处理有可能处于中间态。

我们需要停止关闭,直到所有的 worker 都报告说它们完成了工作。现在,我们进入 sync.WaitGroup,它允许我们等待任意数目的 goroutine 结束!

当启动 worker 时,我们传递一个指向在 func main() 中创建的 WaitGroup 的指针:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const numberOfWorkers = 4

func main() {
// ... omitted ...
wg := &sync.WaitGroup{}
wg.Add(numberOfWorkers)

// Start [workerPoolSize] workers
for i := 0; i < workerPoolSize; i++ {
go workerFunc(wg)
}

// ... more omitted stuff ...

<-termChan // wait for SIGINT / SIGTERM
cancelFunc() // send the shutdown signal through the context.Context
wg.Wait() // program will wait here until all worker goroutines have reported that they're done
fmt.Println("Workers done, shutting down!")
}

这会稍微改变我们的 worker 启动函数:

1
2
3
4
5
6
func workerFunc(wg *sync.WaitGroup) {
defer wg.Done() // Mark this goroutine as done! once the function exits
for event := range jobsChan {
// handle the event...
}
}

wg.Done() 将 waitgroup 减一,一旦内部计数器变成 0,那么主 goroutine 将继续执行 wg.Wait() 之下的语句。这就完成了优雅关闭!

运行

最终程序的源代码在下一个部分。在此其中,我添加了一些日志,这样就能看看该过程发生了什么。

下面是一个带有 4 个工作 goroutine 的程序的执行输出,这里,我使用 Ctrl+C 来停止程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ go run main.go 
Worker 3 starting
Worker 2 starting
Worker 1 starting
Worker 0 starting
Worker 3 finished processing job 0
Worker 0 finished processing job 3
^C********************************* <-- HERE I PRESS CTRL+C
Shutdown signal received
*********************************
Worker 3 finished processing job 4
Worker 2 finished processing job 1
Worker 1 finished processing job 2
Consumer received cancellation signal, closing jobsChan! <-- Here, the consumer receives the <-ctx.Done()
Worker 3 finished processing job 6
Worker 0 finished processing job 5
Worker 1 finished processing job 8
Worker 2 finished processing job 7
Worker 0 finished processing job 10
Worker 0 interrupted <-- Worker 0 has finished job #10, 3 left
Worker 2 finished processing job 12
Worker 2 interrupted <-- Worker 2 has finished job #12, 2 left
Worker 3 finished processing job 9
Worker 3 interrupted <-- Worker 3 has finished job #9, 1 left
Worker 1 finished processing job 11
Worker 1 interrupted <-- Worker 1 has finished job #11, all done
All workers done, shutting down!

有人可能会观察到,消费者接收到 < -ctx.Done() 的时间点实际上是不确定的,这是因为 Go 运行时调度 channel 上的通信到 select 语句的方法。Go 规范是这样说的:

1
“如果可以处理一个或多个通信,那么选择进行处理的那个 chanel 是通过统一的伪随机选择的。(If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection)”。

这就是为什么即使在按下 CTRL+C 之后,任务也可以被传给 worker。

另一个特别的事情是,似乎即使在关闭了 jobsChan _之后_,任务(任务 9-12)还是被传给 worker 了。恩,它们实际是在该 channel 被关闭 _之前_ 被传给 worker 的。这个现象会发生是因为我们使用了一个带有 4 个“槽” 的缓存 channel。这意味着,假定我们第三方生产者以比我们的 worker 可以处理的速度更快地不断传递新事件,如果所有四个 worker 都从 channel 中消费了一个任务并且处理它们,那么该 channel 里就可能会有四个新的事件正等待被消费。关闭 channel 并不会影响那些已经缓存到 channel 里的数据 —— Go 允许消费者消费它们。

如果我们将 jobsChan 修改为无缓存的:

1
jobsChan := make(chan int)

然后再次运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ go run main.go
.... omitted for brevity ....
^C*********************************
Shutdown signal received
*********************************
Worker 3 finished processing job 3
Worker 3 started job 5
Worker 0 finished processing job 4
Worker 0 started job 6
Consumer received cancellation signal, closing jobsChan! <-- again, it may take some time until the consumer is handed <-ctx.Done()
Consumer closed jobsChan
Worker 1 finished processing job 1 <-- From here on, we see that each worker finishes exactly one job before being interrupted.
Worker 1 interrupted
Worker 2 finished processing job 2
Worker 2 interrupted
Worker 0 finished processing job 6
Worker 0 interrupted
Worker 3 finished processing job 5
Worker 3 interrupted
All workers done, shutting down!

这一次,在 channel 关闭后,我们就没有看到任何“不期望的”任务被 worker 消费了。然而,让 channel 缓存跟 worker 数相同的数据,是在不必要拖慢生产端的情况下,让 worker 保持处理数据的常见优化手法。

完整的程序

上面的代码片段在某些地方进行了简化,以使得它们尽可能简洁。带有某些结构以封装和模拟第三方生产者的完整程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

const workerPoolSize = 4

func main() {
// 创建消费者
consumer := Consumer{
ingestChan: make(chan int, 1),
jobsChan: make(chan int, workerPoolSize),
}

// 模拟外部库:每秒发送 10 个事件
producer := Producer{callbackFunc: consumer.callbackFunc}
go producer.start()

// 设置取消 context 和 waitgroup
ctx, cancelFunc := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

// 传递取消 context 以启动消费者
go consumer.startConsumer(ctx)

// 启动 worker,并添加 [workerPoolSize] 到 WaitGroup
wg.Add(workerPoolSize)
for i := 0; i < workerPoolSize; i++ {
go consumer.workerFunc(wg, i)
}

// 处理终止信号,并等待 termChan 信号
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

<-termChan // 这里阻塞直到接收到信号

// 处理关闭
fmt.Println("*********************************\nShutdown signal received\n*********************************")
cancelFunc() // 向 context.Context 发送取消信号
wg.Wait() // 这里阻塞直至所有 worker 完成

fmt.Println("All workers done, shutting down!")
}

消费者结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// -- 从这里起,下面是 Consumer!
type Consumer struct {
ingestChan chan int
jobsChan chan int
}

// 每当外部库传递一个事件给我们,就会调用 callbackFunc。
func (c Consumer) callbackFunc(event int) {
c.ingestChan <- event
}

// workerFunc 启动一个 worker 函数,它会遍历 jobsChan,直到该 channel 关闭。
func (c Consumer) workerFunc(wg *sync.WaitGroup, index int) {
defer wg.Done()

fmt.Printf("Worker %d starting\n", index)
for eventIndex := range c.jobsChan {
// 模拟工作执行 1 ~ 3 秒
fmt.Printf("Worker %d started job %d\n", index, eventIndex)
time.Sleep(time.Millisecond * time.Duration(1000+rand.Intn(2000)))
fmt.Printf("Worker %d finished processing job %d\n", index, eventIndex)
}
fmt.Printf("Worker %d interrupted\n", index)
}

// startConsumer 作为 ingestChan 和 jobsChan 之间的代理,使用 select 语句以支持优雅关闭。
func (c Consumer) startConsumer(ctx context.Context) {
for {
select {
case job := <-c.ingestChan:
c.jobsChan <- job
case <-ctx.Done():
fmt.Println("Consumer received cancellation signal, closing jobsChan!")
close(c.jobsChan)
fmt.Println("Consumer closed jobsChan")
return
}
}
}

最后,模拟外部库的生产者结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
// -- Producer 模拟一个外部库,每 100ms 有新数据时simulates an external library that invokes the 
// 它会调用注册的回调函数。
type Producer struct {
callbackFunc func(event int)
}
func (p Producer) start() {
eventIndex := 0
for {
p.callbackFunc(eventIndex)
eventIndex++
time.Sleep(time.Millisecond * 100)
}
}

总结

我希望这篇小博文提供了一个简单的例子,说明了基于 goroutine 的 worker 池,以及如何使用基于 context 的取消、WaitGroup 和生产端 channel 关闭,来优雅关闭这些 goroutine。