chan是go语言在语言层面提供的goroutine间的通信方式,比Unix管道更加易用也更加轻便。

数据结构

channel是由队列、类型信息、goroutine等待队列组成的,

type hchan struct {
	qcount uint // 当前队列中剩余元素个数
	dataqsiz uint // 环形队列长度,即可以存放的元素个数
	buf unsafe.Pointer // 环形队列指针
	elemsize uint16 // 每个元素的大小
	closed uint32 // 标识关闭状态
	elemtype *_type // 元素类型
	sendx uint // 队列下标,指示元素写入时存放到队列中的位置
	recvx uint // 队列下标,指示元素从队列的该位置读出
	recvq waitq // 等待读消息的goroutine队列
	sendq waitq // 等待写消息的goroutine队列
	lock mutex // 互斥锁,chan不允许并发读写
}

环形队列

chan内部使用环形队列来作为缓冲区存放数据

  • buf指向环形队列在内存中的地址
  • 环形队列的容量dataqsiz和当前存放的元素格式qcount,qcount小于dataqsiz就说明当前可以写入元素。
  • sendx是后续写入元素的位置,recvx表示从该位置读取数据。每次读或者写后,对应的变量都要后移。

等待队列

当chan没有缓冲区或者缓冲区为空的时候,读此chan的goroutine会被阻塞;当没有缓冲区或者缓冲区满的时候,向此chan写的当前goroutine会被阻塞。

recvq和sendq就是分别因为读和写这个chan而被阻塞goroutine的等待队列。

  • 因为读而阻塞的goroutine会被向chan写入数据的goroutine唤醒。
  • 因为写而阻塞的goroutine会被向chan读数据的goroutine唤醒。

类型信息

一个chan只能传递一种类型,这里使用elemsize表示类型的大小,使用elemtype用于数据传递过程中的赋值。

chan自身是保证goroutine并发安全的,所以它自己使用了锁lock来保证自己数据的同步。

那么现在就对channel的基本类型hchan的字段都进行了说明。

channel的读写

创建channel

创建channel的过程就是初始化hchan数据结构的过程,其中类型信息和缓冲区长度由make语句传入,buf的大小则由元素大小和缓冲区长度共同决定。

创建channel的伪代码如下:

func makechan(t *chantype, size int) *hchan {
	var c *hchan
	c = new(hchan)
	c.buf = malloc(元素类型大小 * size)
	c.elemsize = 元素类型大小
	c.elemtype = 元素类型
	c.dataqsiz = size
}

写channel

向一个channel中写数据的简单过程如下:

  1. 如果等待接受队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq中取出G,并把数据写入,最后把该G唤醒,结束发送过程。
  2. 如果缓冲区有空余位置,将数据写入缓冲区,结束发送过程。
  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒。

简单来说,就是写chan的时候有goroutine正在等待读取数据,那么直接取出一个读协程,然后写入数据将这个协程唤醒即可。如果没有协程在等待数据,那么就要判断是否缓冲区有空余,如果有空余,就可以直接写入缓冲区结束,如果没有空余,那么这个协程就要加入到sendq等待队列中,进入睡眠,等待数据被读取后唤醒。

读channel

  1. 如果sendq不为空,这时候有两种情况,没有缓冲区或者缓冲区已满
    1. 没有缓冲区,那么直接从sendq中取出G,把G中数据读出,最后把G唤醒就结束。
    2. 如果缓冲区已满,就直接从缓冲区的首部读取数据,把G中数据写入缓冲区尾部,把G唤醒,读取过程结束。
  2. 如果缓冲区有数据,则从缓冲区中取出数据,结束。
  3. 没有数据可读,那么就要将当前协程加入recvq中等待有数据写入后再唤醒。

关闭channel

关闭channel会唤醒recvq上的所有等待的G,对应的数据位置为nil;sendq中的G全部唤醒,但这些G会panic。

常见用法

单向channel

其实并不存在单向channel,只是对channel加上了使用限制,只能向其读或者写。 跟c语言的关键字const修饰函数参数为只读是一个道理。

select

使用select监控多个channel,当其中一个channel可读时,就从中读取数据。如:

package main
import(
	"fmt"
	"time"
)
func addNumberToChan(chanName chan int) {
	for {
		chanName <- 1
		time.Sleep(1 * time.Second)
	}
}
 
func main() {
	var chan1 = make(chan int, 10)
	var chan2 = make(chan int, 10)
	go addNumberToChan(chan1)
	go addNumberToChan(chan2)
	for {
		select {
		case e := <- chan1:
			fmt.Printf("Get element from chan1: %d\n", e)
		case e := <- chan2:
			fmt.Printf("Get element from chan2: %d\n", e)
		default:
			fmt.Printf("No element in chan1 and chan2")
			time.Sleep(1 *time.Second)
		}
	}
}

select的case语句从channel读取数据不会阻塞,就算channel中没有数据。这是由于case编译后调用读channel时会明确传入不阻塞的参数,读不到数据时不会将当前goroutine加入到等待队列中,而是直接返回。

range

可以向遍历数组一样持续从channel中读取数据,直到channel被关闭,当没有数据的时候会阻塞当前协程,与读channel中阻塞处理的机制一样

func chanRange(chanName chan int) {
	for e := range chanName {
		fmt.Printf("Get element from chan: %d\n", e)
	}
}

如果向这个channel写数据的协程退出了,系统检测到这种情况后会panic,否则range将会永远阻塞在这里