专业编程基础技术教程

网站首页 > 基础教程 正文

Go语言面试必备 深入Go Channel内部实现2

ccvgpt 2024-08-06 12:51:43 基础教程 11 ℃

我们上篇Go语言面试必备 深入Go channel内部实现 1中介绍了channel的内部结构,它是一个hchan的结构体,使用一个环形队列来存储发送过来的数据。

本篇我们来讲讲channel的初始化和发送消息,接收消息,以及关闭channel。在开始之前,我们先抛出两个问题。

Go语言面试必备 深入Go Channel内部实现2

问题1

以下代码会输出什么?

func main() {
   ch := make(chan int, 20)
   go func() {
      for a := range ch {
         fmt.Println("from a: ", a)
      }
   }()
   go func() {
      for b := range ch {
         fmt.Println("from b: ", b)
      }
   }()
   for i:=0; i < 20; i++ {
      ch <- i
      time.Sleep(time.Millisecond*20)
   }
   time.Sleep(time.Second*30)
}

问题2

为什么在Go中对context进行cancel,会触发所有监听ctx.Done()消息的goroutine都会收到消息呢?

初始化channel

a := make(chan int, 10) 

在编译的时候,golang的complier会使用src/runtime/chan中的makechan函数来编译。

func makechan(t *chantype, size int) *hchan {
  elem := t.elem

	// 检查chan的单个元素内存大小,不能超过65536个字节。
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
  // 这个内存是否对齐
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

  // 检查单个元素大小与数量size乘积得出整个buff内存大小,检查是否溢出uint
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// 初始化hchan,按照mem来分配内存
	var c *hchan
	switch {
  // 无缓冲的buffer,size是0,那么只需要分配一个hchanSize
  case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
  // 元素类型不是指针的,直接分配内存,hchan本身大小+所有元素大小。    
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    // 前文说过,buf是一个指向环形队列的buf,这个add就是将这块内存的首地址移动hchansize
    // 得到环形队列所在的首地址。这是一块连续的内存。
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
    // 有指针的,那就按照指针的存储内存分配,比如elem超过32KB,就直接从heap分配一块内存了。
    // 同时设定该段内存的GC策略。
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

  // 接下来初始化elemsize ,elemtype类型,环形队列大小dataqsize.
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

看到这里我们应该明白,make chan的时候,buffer大小应该按照自己的需求设定,不然真的就是开辟了那么大的内存出来了。所以我们在上篇中抛出的问题,关于初始化的时候为何不能随意地设计buffer的size大小,我相信你应该已经有答案了。

向channel发送消息

我们还是给源码加注释的方式,考虑篇幅,删除部分代码,以"..."来说明,外加部分特别说明。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...  
	lock(&c.lock)

  // 如果channel已经被关闭了,还向其发消息,那就解锁并panic,这就是我们不能在关闭的channel上发消息的原因。
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

  // recvq 是接收消息的goroutine链表,先弹出一个来,如果存在,就直接发给他。
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 检测当前队列中的数据是否小于队列的容量,如果小于那么就把数据放到sendx所在位置中去,
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
    // 环形队列中sendx是否等于容量了,如果是从头开始,环形。
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
    // qcount 我们说过,这是环形队列中的数据量
		c.qcount++
		unlock(&c.lock)
    // 对于有buffer的channel,如果没有goroutine在接收,就是直接存储不阻塞,然后结束。
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}
	
	// 当缓冲队列buff没满的时候,我们已经直接发送数据了,chan直接返回了。
  // 如果缓冲队列buff满了,那么我们就要阻塞住了。怎么阻塞呢
  // 获取当前的goroutine g,然后将其组装成为sudog形式,然后将其塞到channel的sendq链表里,
	// 前文我们说过,sendq是一个双向链表waitq,waitq的元素类型就是sudog。
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg) // 这里就是放到队列里了
  // 更改当前goroutine的状态为parkingOnChan,并调用gopark让该gotoutine进入
  // 等待状态,让出cpu执行权限。
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// 这个不是很理解,已经让出执行了,还怎么让发送的消息保活,防止被gc呢?
	KeepAlive(ep)

	// 被唤醒了,唤醒的方式可以是close,也可以是数据被接收了,
  // 这个我们在后续的close和recvchan中可以了解到
  // 唤醒之后,我们可以看到这里并不需要再把数据存到buffer,而是直接释放资源sudog
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
// 从recvq中弹出的goroutine sg,我们直接将要发送的消息ep拷贝到sg.elem上
//(sendDirect),然后呢,我们再将阻塞的接收goroutine sg.g的参数变更,
// 并调用goready,将其唤醒。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	 ...
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	dst := sg.elem
  ...
	memmove(dst, src, t.size)
}

我们总结一下chansend流程

  • 向已经关闭的channel发消息会panic
  • 如果此时有goroutine正在阻塞读取,也就是在recvq这个链表有元素,那么此时,就是直接将其弹出,然后将待发送的消息直接拷贝到该goroutine上,之后将被阻塞的recv gourine唤醒。
  • 如果环形队列还没有满,我们知道有缓冲的channel是发送数据可以立刻返回,所以此时就是将数据放到buff队列里sendx指定的位置。然后退出。
  • 如果环形队列满了,然后发送消息的goroutine需要被阻塞,阻塞的方式就是获取当前的g组装一个sudog,然后放到hchan的双向链表sendq,并调用gopark使它进入等待,并让出cpu。
  • 在消息被接收或者chan被关闭的时候,此goroutine会被唤醒,将其从sendq中移除,并释放内存,然后继续执行。

问题:

  • 在gopark之后,才对要发送的消息ep进行保活KeepAlive,防止被GC,此时是怎么生效的呢?有了解的同学嘛?这是怎么回事呢?

最后

你学废了吗?有什么问题,下方留言哦。关注我,不走丢,下篇更精彩呦!

Tags:

最近发表
标签列表