Go语言提供了三种并发控制的解决方案:

  • Channel:使用channel控制自协程。
  • WaitGroup:使用信号量机制控制子协程。
  • Context:使用上下文控制子协程。

channel实现简单,清晰易懂;WaitGroup可以动态调整子协程个数;Context可以控制子协程派生出来的孙子协程。

channel

chan中对channel的介绍中,channel一般用于协程之间的通信,不过channel也可以用于并发控制。比如主协程启动N个子协程,主协程等待所有子协程退出后再继续后续流程。

package main
 
import (
	"time"
	"fmt"
)
func Process(ch chan int) {
	time.Sleep(time.Second)
	ch <- 1
}
func main() {
	channels := make([]chan int, 10)
	for i := 0; i < 10; i++ {
		channels[i] = make(chan int)
		go Process(channels[i])
	}
	for i, ch := range channels {
		<- ch
		fmt.Println("Routine", i, "quit!")
	}
}

主协程启动N个子协程执行任务,并通过channel通信来进行管理。每个子协程执行结束都向chan写入一个数据,主协程接收到全部子协程执行完毕之后再结束。

使用channel来控制子协程的优点就是实现简单缺点就是当需要大量创建协程时就需要有相同数据的channel,而且对于子协程继续派生出来的协程不方便控制

WaitGroup

WaitGroup是等待一组goroutine结束。如果一个goroutine需要等待其他几个goroutine全部完成后再执行,利用WaitGroup可与很方便实现。

package main
import (
	"fmt"
	"time"
	"sync"
)
func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		time.Sleep(1 *time.Second)
		fmt.Println("Goroutine 1 finished")
		wg.Done()
	}()
	go func() {
		time.Sleep(2 *time.Second)
		fmt.Println("Goroutine 2 finished")
		wg.Done()
	}()
	wg.Wait()
	fmt.Printf("All Goroutine finished")
}

wg内部维护了一个计数器:

  1. 启动goroutine前将计数器通过Add(2)将计数器设置为待启动的goroutine个数。
  2. 启动goroutine后,使用Wait()方法阻塞自己,等待计数器变为0.
  3. 每个goroutine执行结束后通过Done()方法将计数器减1.
  4. 计数器变为0后,阻塞的goroutine被唤醒。

WaitGroup实现中使用了信号量,信号量就是unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。

  • 当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1.
  • 当信号量为0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒。

src/sync/waitgroup.go:WaitGroup定义了数据结构:

type WaitGroup struct {
	state1 [3]uint32
}

这个长度为3的数组其实是两个计数器和一个信号量:

  • counter:当前还未执行结束的goroutine计数器
  • wait count:等待goroutine-group结束的goroutine数量,即有多少个等候者。
  • semaphore:信号量。

WaitGroup对外提供三个接口:

  • Add(delta int):将delta值加到count中
  • Wait():waiter递增1,并阻塞等待信号量semaphore
  • Done():counter递减1,按照waiter数值释放响应次数信号量。

Add(delta int)

Add做了两件事,一是把delta值累加到counter中,因为delta可以为负值,也就是说counter有可能变成0或负值,所以第二件事就是当counter值变为0时,根据waiter数值释放等量的信号量,把等待的goroutine全部唤醒,如果counter变为负值,则panic。

func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state() //获取state和semaphore的地址指针
	state := aotmic.AddUint64(statep, uint64(delta) << 32)
	v := int32(state >> 32) // 获取counter值
	w := uint32(state) //获取waiter值
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	// counter > 0说明不需要释放信号量,直接退出
	// waiter = 0,说明没有等待着,也不需要释放信号量,直接退出
	if v > 0 || w == 0 {
		return
	}
	// 将counter置为0,再释放waiter个数的信号量
	*statep = 0
	for ; w !=0; w-- {
		runtime_Semrelease(semap, false)
	}
}

Wait()

wait方法累加waiter,阻塞等待信号量

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state() //获取state和semaphore地址指针
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
		// counter = 0,说明所有goroutine都退出了,不需要等待,直接返回
		if v == 0 {
			return
		}
		// 使用CAS累加waiter,累加可能会失效,失败后通过for loop下次充实
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			runtime_Semaphore(semap)
			return
		}
	}
}

Done

只负责将counter减1,Add()可以接受负值,所以Done实际上只是调用了Add(-1)。

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}
  • Add的操作必须早于Wait,否则会panic
  • Add设置的值必须与实际等待的goroutine个数一致,否则会panic。

context

context对于派生goroutine有更强的控制力,可以控制多级goroutine。它可以控制一组呈树状结构的goroutine,每个goroutine拥有相同的上下文。

典型的应用场景如上图,goroutine派生出子goroutine,而子goroutine又继续派生出新的goroutine。这种情况下goroutine的个数不容易确定,使用WaitGroup不容易,而使用context就可以很容易实现。

接口定义

context实际上只是一个接口,在src/context/context.go:Context定义:

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key interface{}) interface{}
}
  • Deadline:返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok等于false,此时deadline为一个初始值的time.Time值。
  • Done:返回一个channel,需要在select-case中使用,当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;当context还未关闭时,Done()返回nil。
  • Err:描述context关闭的原因,关闭原因由context实现控制,不需要用户设置。当context关闭后,Err返回context的关闭原因;context还未关闭,Err返回nil。
  • Value:用于在树状分布的goroutine间传递消息。可以根据key值查询map中的value。

空context

context中定义了一个空的context,名为emptyCtx,用于context的根节点,空的context只是简单实现了Context,本身不包含任何值,仅用于其他context的父节点。

type emptyCtx int
 
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}
 
func (*emptyCtx) Done() <-chan struct{} {
	return nil
}
 
func (*emptyCtx) Err() error {
	return nil
}
 
func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

除了emptyCtx外,还有cancelCtx, timerCtx和 valueCtx三种,正是基于这三种context实例,实现了上述4个方法的context,即cancelCtx,timerCtx和valueCtx。分别用下面的方法进行创建:

  • WithCancel()
  • WithDeadline()
  • WithTimeout()
  • WithValue() WithDeadline和WithTimeout创建的都是timerCtx类型的上下文。

cancelCtx

src/context/context.go:cancelCtx定义了该类型:

type cancelCtx struct {
	Context
	mu sync.Mutex
	done chan struct{}
	children map[canceler]struct{}
	err error
}

children记录了由此context派生的所有child,此context被cancel的时候会把所有的child都cancel掉。

cancelCtx与deadline和value无关,只需要实现Done和Err外露接口就可以了。 查看其Done方法的实现

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

cancleCtx.done会在context被cancel时关闭,这样就能够从返回的channel中读取到内容了。

对于cancelCtx来说最重要的就是cancel接口的实现了,其作用是关闭自己和后代

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	c.mu.Lock()
	c.err = err
	close(c.done)
	for child := range c.children {
		child.cancel(false, err)
	}
	c.mu.Unlock()
	if removeFromParent {
		removeChild(c.Context, c)
	}
}

WithCancel()返回的第二个用于cancel context的正是此cancel()。其实现如下:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c) // 将自身添加到父节点
	return &c, func() {c.cancel(true, Canceled)}
}

将自身添加到父节点的过程:

  1. 如果父节点也支持cancel,那么父节点肯定由children成员,把新的context添加到children里即可。
  2. 如果父节点不支持cancel,就继续向上查询,直到找到一个支持cancel的节点,把新context添加到其children即可。
  3. 如果所有父节点都不支持cancel,那么启动一个协程等待父节点结束,然后再把当前context结束。

timerCtx

src/context/context.go:timerCtx中有定义:

type timerCtx struct {
	cancelCtx
	timer *time.Timer
	deadline time.Time
}

timerCtx在cancelCtx的基础上增加了deadline用于标示自动cancel的最终时间,而timer就是要给触发自动cancel的定时器。 由此,衍生出WithDeadline()和WithTimeout(),这两者的实现原理是一行的,只是使用的语境不同。

  • deadline:指定最后期限,类似于绝对时间。
  • timeout:指定最长存活时间,类似于相对时间。

timerCtx在cancelCtx的基础上实现了Deadline(),重写了cancel方法。

  • Deadline()方法就是简单的返回timerCtx.deadline,而timerCtx.deadline是通过WithDeadline()或WithTimeout()方法设置的。
  • cancel()的实现与cancelCtx基本一致,不过需要额外把timer关闭。

WithDeadline()和WithTimeout()都是创建一个timerCtx实例,并启动定时器,在定时器到期后自动cancel本context。

valueCtx

valueCtx在Context的基础上添加了一个key-value,用于在协程间传递一些消息。

type valueCtx struct {
	Context
	key, val interface{}
}

只需要实现Value接口

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

当前context找不到key的时候,就会向父节点查找,如果查找不到则最终返回interface{}

总结

  • Context仅仅是一个接口定义,根据实现不同,可以衍生出不同的context类型。
  • cancelCtx实现了Context接口,通过WithCancel()创建cancelCtx实例。
  • timerCtx实现了Context接口,通过WithDeadline()和WithTimeout()创建timerCtx实例。
  • valueCtx实现了Context接口,通过WithValue()创建valueCtx实例。
  • 三种context实例可以互为父节点,从而组合成不同的应用形式。