Go语言提供了三种并发控制的解决方案:
- Channel:使用channel控制自协程。
- WaitGroup:使用信号量机制控制子协程。
- Context:使用上下文控制子协程。
channel实现简单,清晰易懂;WaitGroup可以动态调整子协程个数;Context可以控制子协程派生出来的孙子协程。
channel
在chan中对channel的介绍中,channel一般用于协程之间的通信,不过channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程。
package main
import (
"time"
"fmt"
)
func Process(ch chan int) {
time.Sleep(time.Second)
ch <- 1
}
func main() {
channels := make([]chan int, 10)
for i := 0; i < 10; i++ {
channels[i] = make(chan int)
go Process(channels[i])
}
for i, ch := range channels {
<- ch
fmt.Println("Routine", i, "quit!")
}
}主协程启动N个子协程执行任务,并通过channel通信来进行管理。每个子协程执行结束都向chan写入一个数据,主协程接收到全部子协程执行完毕之后再结束。
使用channel来控制子协程的优点就是实现简单,缺点就是当需要大量创建协程时就需要有相同数据的channel,而且对于子协程继续派生出来的协程不方便控制。
WaitGroup
WaitGroup是等待一组goroutine结束。如果一个goroutine需要等待其他几个goroutine全部完成后再执行,利用WaitGroup可与很方便实现。
package main
import (
"fmt"
"time"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(1 *time.Second)
fmt.Println("Goroutine 1 finished")
wg.Done()
}()
go func() {
time.Sleep(2 *time.Second)
fmt.Println("Goroutine 2 finished")
wg.Done()
}()
wg.Wait()
fmt.Printf("All Goroutine finished")
}wg内部维护了一个计数器:
- 启动goroutine前将计数器通过Add(2)将计数器设置为待启动的goroutine个数。
- 启动goroutine后,使用Wait()方法阻塞自己,等待计数器变为0.
- 每个goroutine执行结束后通过Done()方法将计数器减1.
- 计数器变为0后,阻塞的goroutine被唤醒。
WaitGroup实现中使用了信号量,信号量就是unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。
- 当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1.
- 当信号量为0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒。
在src/sync/waitgroup.go:WaitGroup定义了数据结构:
type WaitGroup struct {
state1 [3]uint32
}这个长度为3的数组其实是两个计数器和一个信号量:

- counter:当前还未执行结束的goroutine计数器
- wait count:等待goroutine-group结束的goroutine数量,即有多少个等候者。
- semaphore:信号量。
WaitGroup对外提供三个接口:
- Add(delta int):将delta值加到count中
- Wait():waiter递增1,并阻塞等待信号量semaphore
- Done():counter递减1,按照waiter数值释放响应次数信号量。
Add(delta int)
Add做了两件事,一是把delta值累加到counter中,因为delta可以为负值,也就是说counter有可能变成0或负值,所以第二件事就是当counter值变为0时,根据waiter数值释放等量的信号量,把等待的goroutine全部唤醒,如果counter变为负值,则panic。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state() //获取state和semaphore的地址指针
state := aotmic.AddUint64(statep, uint64(delta) << 32)
v := int32(state >> 32) // 获取counter值
w := uint32(state) //获取waiter值
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// counter > 0说明不需要释放信号量,直接退出
// waiter = 0,说明没有等待着,也不需要释放信号量,直接退出
if v > 0 || w == 0 {
return
}
// 将counter置为0,再释放waiter个数的信号量
*statep = 0
for ; w !=0; w-- {
runtime_Semrelease(semap, false)
}
}Wait()
wait方法累加waiter,阻塞等待信号量
func (wg *WaitGroup) Wait() {
statep, semap := wg.state() //获取state和semaphore地址指针
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// counter = 0,说明所有goroutine都退出了,不需要等待,直接返回
if v == 0 {
return
}
// 使用CAS累加waiter,累加可能会失效,失败后通过for loop下次充实
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semaphore(semap)
return
}
}
}Done
只负责将counter减1,Add()可以接受负值,所以Done实际上只是调用了Add(-1)。
func (wg *WaitGroup) Done() {
wg.Add(-1)
}- Add的操作必须早于Wait,否则会panic
- Add设置的值必须与实际等待的goroutine个数一致,否则会panic。
context
context对于派生goroutine有更强的控制力,可以控制多级goroutine。它可以控制一组呈树状结构的goroutine,每个goroutine拥有相同的上下文。
典型的应用场景如上图,goroutine派生出子goroutine,而子goroutine又继续派生出新的goroutine。这种情况下goroutine的个数不容易确定,使用WaitGroup不容易,而使用context就可以很容易实现。
接口定义
context实际上只是一个接口,在src/context/context.go:Context定义:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}- Deadline:返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok等于false,此时deadline为一个初始值的time.Time值。
- Done:返回一个channel,需要在select-case中使用,当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;当context还未关闭时,Done()返回nil。
- Err:描述context关闭的原因,关闭原因由context实现控制,不需要用户设置。当context关闭后,Err返回context的关闭原因;context还未关闭,Err返回nil。
- Value:用于在树状分布的goroutine间传递消息。可以根据key值查询map中的value。
空context
context中定义了一个空的context,名为emptyCtx,用于context的根节点,空的context只是简单实现了Context,本身不包含任何值,仅用于其他context的父节点。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}除了emptyCtx外,还有cancelCtx, timerCtx和 valueCtx三种,正是基于这三种context实例,实现了上述4个方法的context,即cancelCtx,timerCtx和valueCtx。分别用下面的方法进行创建:
- WithCancel()
- WithDeadline()
- WithTimeout()
- WithValue()
WithDeadline和WithTimeout创建的都是timerCtx类型的上下文。
cancelCtx
在src/context/context.go:cancelCtx定义了该类型:
type cancelCtx struct {
Context
mu sync.Mutex
done chan struct{}
children map[canceler]struct{}
err error
}children记录了由此context派生的所有child,此context被cancel的时候会把所有的child都cancel掉。
cancelCtx与deadline和value无关,只需要实现Done和Err外露接口就可以了。 查看其Done方法的实现
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}cancleCtx.done会在context被cancel时关闭,这样就能够从返回的channel中读取到内容了。
对于cancelCtx来说最重要的就是cancel接口的实现了,其作用是关闭自己和后代
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock()
c.err = err
close(c.done)
for child := range c.children {
child.cancel(false, err)
}
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}WithCancel()返回的第二个用于cancel context的正是此cancel()。其实现如下:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c) // 将自身添加到父节点
return &c, func() {c.cancel(true, Canceled)}
}将自身添加到父节点的过程:
- 如果父节点也支持cancel,那么父节点肯定由children成员,把新的context添加到children里即可。
- 如果父节点不支持cancel,就继续向上查询,直到找到一个支持cancel的节点,把新context添加到其children即可。
- 如果所有父节点都不支持cancel,那么启动一个协程等待父节点结束,然后再把当前context结束。
timerCtx
在src/context/context.go:timerCtx中有定义:
type timerCtx struct {
cancelCtx
timer *time.Timer
deadline time.Time
}timerCtx在cancelCtx的基础上增加了deadline用于标示自动cancel的最终时间,而timer就是要给触发自动cancel的定时器。 由此,衍生出WithDeadline()和WithTimeout(),这两者的实现原理是一行的,只是使用的语境不同。
- deadline:指定最后期限,类似于绝对时间。
- timeout:指定最长存活时间,类似于相对时间。
timerCtx在cancelCtx的基础上实现了Deadline(),重写了cancel方法。
- Deadline()方法就是简单的返回timerCtx.deadline,而timerCtx.deadline是通过WithDeadline()或WithTimeout()方法设置的。
- cancel()的实现与cancelCtx基本一致,不过需要额外把timer关闭。
WithDeadline()和WithTimeout()都是创建一个timerCtx实例,并启动定时器,在定时器到期后自动cancel本context。
valueCtx
valueCtx在Context的基础上添加了一个key-value,用于在协程间传递一些消息。
type valueCtx struct {
Context
key, val interface{}
}只需要实现Value接口
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}当前context找不到key的时候,就会向父节点查找,如果查找不到则最终返回interface{}。
总结
- Context仅仅是一个接口定义,根据实现不同,可以衍生出不同的context类型。
- cancelCtx实现了Context接口,通过WithCancel()创建cancelCtx实例。
- timerCtx实现了Context接口,通过WithDeadline()和WithTimeout()创建timerCtx实例。
- valueCtx实现了Context接口,通过WithValue()创建valueCtx实例。
- 三种context实例可以互为父节点,从而组合成不同的应用形式。