我们上篇Go语言面试必备 深入Go channel内部实现 1中介绍了channel的内部结构,它是一个hchan的结构体,使用一个环形队列来存储发送过来的数据。
本篇我们来讲讲channel的初始化和发送消息,接收消息,以及关闭channel。在开始之前,我们先抛出两个问题。
问题1
以下代码会输出什么?
func main() {
ch := make(chan int, 20)
go func() {
for a := range ch {
fmt.Println("from a: ", a)
}
}()
go func() {
for b := range ch {
fmt.Println("from b: ", b)
}
}()
for i:=0; i < 20; i++ {
ch <- i
time.Sleep(time.Millisecond*20)
}
time.Sleep(time.Second*30)
}
问题2
为什么在Go中对context进行cancel,会触发所有监听ctx.Done()消息的goroutine都会收到消息呢?
初始化channel
a := make(chan int, 10)
在编译的时候,golang的complier会使用src/runtime/chan中的makechan函数来编译。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 检查chan的单个元素内存大小,不能超过65536个字节。
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 这个内存是否对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 检查单个元素大小与数量size乘积得出整个buff内存大小,检查是否溢出uint
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 初始化hchan,按照mem来分配内存
var c *hchan
switch {
// 无缓冲的buffer,size是0,那么只需要分配一个hchanSize
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
// 元素类型不是指针的,直接分配内存,hchan本身大小+所有元素大小。
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 前文说过,buf是一个指向环形队列的buf,这个add就是将这块内存的首地址移动hchansize
// 得到环形队列所在的首地址。这是一块连续的内存。
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 有指针的,那就按照指针的存储内存分配,比如elem超过32KB,就直接从heap分配一块内存了。
// 同时设定该段内存的GC策略。
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 接下来初始化elemsize ,elemtype类型,环形队列大小dataqsize.
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
看到这里我们应该明白,make chan的时候,buffer大小应该按照自己的需求设定,不然真的就是开辟了那么大的内存出来了。所以我们在上篇中抛出的问题,关于初始化的时候为何不能随意地设计buffer的size大小,我相信你应该已经有答案了。
向channel发送消息
我们还是给源码加注释的方式,考虑篇幅,删除部分代码,以"..."来说明,外加部分特别说明。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
// 如果channel已经被关闭了,还向其发消息,那就解锁并panic,这就是我们不能在关闭的channel上发消息的原因。
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// recvq 是接收消息的goroutine链表,先弹出一个来,如果存在,就直接发给他。
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 检测当前队列中的数据是否小于队列的容量,如果小于那么就把数据放到sendx所在位置中去,
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
// 环形队列中sendx是否等于容量了,如果是从头开始,环形。
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// qcount 我们说过,这是环形队列中的数据量
c.qcount++
unlock(&c.lock)
// 对于有buffer的channel,如果没有goroutine在接收,就是直接存储不阻塞,然后结束。
return true
}
if !block {
unlock(&c.lock)
return false
}
// 当缓冲队列buff没满的时候,我们已经直接发送数据了,chan直接返回了。
// 如果缓冲队列buff满了,那么我们就要阻塞住了。怎么阻塞呢
// 获取当前的goroutine g,然后将其组装成为sudog形式,然后将其塞到channel的sendq链表里,
// 前文我们说过,sendq是一个双向链表waitq,waitq的元素类型就是sudog。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 这里就是放到队列里了
// 更改当前goroutine的状态为parkingOnChan,并调用gopark让该gotoutine进入
// 等待状态,让出cpu执行权限。
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 这个不是很理解,已经让出执行了,还怎么让发送的消息保活,防止被gc呢?
KeepAlive(ep)
// 被唤醒了,唤醒的方式可以是close,也可以是数据被接收了,
// 这个我们在后续的close和recvchan中可以了解到
// 唤醒之后,我们可以看到这里并不需要再把数据存到buffer,而是直接释放资源sudog
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
// 从recvq中弹出的goroutine sg,我们直接将要发送的消息ep拷贝到sg.elem上
//(sendDirect),然后呢,我们再将阻塞的接收goroutine sg.g的参数变更,
// 并调用goready,将其唤醒。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
...
memmove(dst, src, t.size)
}
我们总结一下chansend流程
- 向已经关闭的channel发消息会panic
- 如果此时有goroutine正在阻塞读取,也就是在recvq这个链表有元素,那么此时,就是直接将其弹出,然后将待发送的消息直接拷贝到该goroutine上,之后将被阻塞的recv gourine唤醒。
- 如果环形队列还没有满,我们知道有缓冲的channel是发送数据可以立刻返回,所以此时就是将数据放到buff队列里sendx指定的位置。然后退出。
- 如果环形队列满了,然后发送消息的goroutine需要被阻塞,阻塞的方式就是获取当前的g组装一个sudog,然后放到hchan的双向链表sendq,并调用gopark使它进入等待,并让出cpu。
- 在消息被接收或者chan被关闭的时候,此goroutine会被唤醒,将其从sendq中移除,并释放内存,然后继续执行。
问题:
- 在gopark之后,才对要发送的消息ep进行保活KeepAlive,防止被GC,此时是怎么生效的呢?有了解的同学嘛?这是怎么回事呢?
最后
你学废了吗?有什么问题,下方留言哦。关注我,不走丢,下篇更精彩呦!