当前位置: 首页 > article >正文

Golang语言系列-Channel

Golang语言系列-Channel

  • 源码分析
    • 结构体定义和构造函数
    • 发送操作
    • 接受操作
    • 关闭操作
    • select 操作
  • 实验
  • 参考

golang里的channel信道是golang里一个独特的概念,基于消息通信的方式来实现并发控制。信道有两种类型,缓存型和非缓存型,其中缓冲型底层基于循环数组来保存数据,然后基于互斥锁保证并发访问安全。对于信道可以有三种操作,分别是读,写以及关闭。读一个nil信道,当前协程会被挂起,读一个已经关闭的信道,如果信道中有元素可以正常读取,如果没有会读取对应类型的零值。如果信道是非缓冲的,等待发送队列中有协程,直接从对应协程中拷贝数据,否则或者如果缓冲为空当前协程阻塞,加入到信道的等待发送队列中。则从缓冲头部中读取一个数据,写一个nil信道,同样会被挂起,写一个已经关闭的信道,会panic,写的时候,也是类似,如果信道的等待接收队列中有协程,直接将数据拷贝过去,否则将当前协程阻塞,加入到等待发送队列。如果关闭一个nil或者已经关闭的信道,也会panic。
本文将从源码分析的角度验证以上的观点。最后实现golang的信道来实现一个多线程打印问题。

源码分析

channel相关的代码在runtime包下的chan.go文件中。

结构体定义和构造函数

首先关注信道的结构体定义,源码如下:

type hchan struct {
	qcount   uint           // total data in the queue  // 数量数量
	dataqsiz uint           // size of the circular queue  // 循环队列的长度
	buf      unsafe.Pointer // points to an array of dataqsiz elements  // 实现循环队列的底层数组的起始地址
	elemsize uint16  // 每一个元素的大小
	closed   uint32  // 是否关闭的标志位
	elemtype *_type // element type
	sendx    uint   // send index   // 队头指针,指向要发送的数据的位置
	recvx    uint   // receive index  // 队尾指针,指向可以存放数据的位置
	recvq    waitq  // list of recv waiters  // 因为从信道接受而阻塞的协程的链表
	sendq    waitq  // list of send waiters  // 因为从信道读取而阻塞的写成链表

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex    // 并发访问的互斥锁
}

发送等待或者接收等待的waitq是一个链表,链表上面的每一个节点是一个指向包装go协程的sudog,其结构体定义如下:

type waitq struct {   // 用于保存阻塞在信道上的协程的双向链表
	first *sudog
	last  *sudog
}
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g   // 指向被阻塞的协程

	next *sudog  // 链表上的下一个
	prev *sudog   // 链表上的上一个
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	// success indicates whether communication over channel c
	// succeeded. It is true if the goroutine was awoken because a
	// value was delivered over channel c, and false if awoken
	// because c was closed.
	success bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

最后来看一下信道的构造函数,源代码如下:

func makechan(t *chantype, size int) *hchan {
	elem := t.Elem
	// 参数校验
	// compiler checks this but be safe.
	if elem.Size_ >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		// 无缓冲或者元素大小为零分配的96个字节
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.PtrBytes == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		// 保存的元素不含有指针
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.Size_)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
	}
	return c
}

发送操作

所有相关的源码如下:

// chansend 为通用的信道发送函数,实际上我们使用的 c <- x, 经过编译调用的是chansend1函数,其又会调用chansend函数,而传入的函数block是true,也就是说要阻塞,但这个调用的信道发送函数可以实现当无法发送的时候可以不阻塞 
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {   // 信道为nil
		if !block {   // 不阻塞,返回
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)   // 阻塞模式下,panic
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	// 非阻塞模式下,且信道未关闭并且无法发送数据(缓冲队列已满或者等待接收队列为空),此时直接返回false,走捷径返回,避免加锁的开销
	if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)   // 加锁

	if c.closed != 0 {   // 由于前面无锁,所以可能在这期间其他协程关闭了信道,所以这里再次检查
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	// 如果等待接收队列非空,说明此时信道无缓冲或者为空,直接取出一个协程,将数据传递过去
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {   // 信道未满,则将数据拷贝到循环队列的对应位置
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	// 到这里说明,要么是无缓冲或者循环队列已满,且没有在等待接收的协程
	if !block {  // 非阻塞模式,则释放锁直接返回false
		unlock(&c.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	// 阻塞在这一个信道上,加入等待发送队列
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)   // 阻塞自身
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)  // 是ep待发送的数据存活,避免被垃圾回收

	// someone woke us up.
	// 被唤醒了
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)   // 从等待队列中取出自身
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		// 此时信道已经被关闭,panic
		panic(plainError("send on closed channel"))
	}
	return true
}

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
// send操作一定是一个空队列c, sg是一个阻塞等待接收的协程,ep是本次要发送到信道中的数据
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0   // 循环队列
			}
			// 因为发送的数据会直接拷贝到等待接受协程,所以循环队列还是空的,这里只是出于某种原因要更新这些位置信息,假装数据先到了信道,然后一个等待协程被唤醒,然后取走了数据
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()   // 释放本协程持有的对于信道的锁
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)   // 唤醒等待接收协程
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.
// sendDirect 直接拷贝数据到等待协程的栈上
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src is on our stack, dst is a slot on another stack.

	// Once we read sg.elem out of sg, it will no longer
	// be updated if the destination's stack gets copied (shrunk).
	// So make sure that no preemption points can happen between read & use.
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	memmove(dst, src, t.Size_)
}

总结一下信道发送的过程:

  1. 首先是检查信道是否为nil(没有初始化),如果为nil,如果是非阻塞模式,则直接返回false,否则panic(用户使用的信道发送都是阻塞的)
  2. 如果处于非阻塞模式下,此时信道已满(无缓冲信道的已满值得是没有等待接收协程队列),则直接返回false表示发送失败,这样做的好处是避免加锁的开销,尽可能减少加锁范围内的代码量
  3. 加锁,再次检查信道是否已经被关闭,如果被关闭则直接panic
  4. 如果等待接收协程队列非空,说明此时信道的状态是空的,此时会直接将数据拷贝给一个等待协程,并且将其唤醒,自身释放锁返回
  5. 如果判断信道状没有满(有缓冲),则将对应数据拷贝到循环队列中,释放锁返回
  6. 如果已满,如果是非阻塞模式,则直接返回false,否则将自身状态更新为阻塞,加入该信道的等待发送队列中
  7. 接下来的代码执行的话,也就是说被唤醒了,此时检查,被唤醒一般有两种情况:一、有一个读信道的协程已经把该阻塞协程的数据拷贝到循环队列了, 二、信道被关闭的时候,会唤醒所有等待发送协程,如果是第一种情况,直接返回,否则也要panic

接受操作

相关源码如下所示

// empty reports whether a read from c would block (that is, the channel is
// empty).  It uses a single atomic read of mutable state.
func empty(c *hchan) bool {
	// c.dataqsiz is immutable.
	if c.dataqsiz == 0 {
		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	}
	return atomic.Loaduint(&c.qcount) == 0
}

// entry points for <- c from compiled code.
//
//go:nosplit
// 以下两个函数,分别对应有无comma,由编译器根据代码选择一个函数
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
// ep是传入的指针,如果为nil说明调用方不关心取出的是什么值,用户使用的默认是阻塞模式,返回的第一个参数用来实现select,第二个参数表示是否收到的是真值
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {
		if !block {   // 阻塞模式下,直接返回
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
		throw("unreachable")  // panic
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {   // 非阻塞模式下,且信道为空(无缓冲下为空的含义为无等待发送队列)
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
		if atomic.Load(&c.closed) == 0 {   // 信道被关闭
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) {   // 还是空的
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)   // 加锁

	if c.closed != 0 {   // 已经被关闭
		if c.qcount == 0 {   // 无数据
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
				typedmemclr(c.elemtype, ep)   // 清空,相当于零值
			}
			return true, false
		}
		// The channel has been closed, but the channel's buffer have data.
	} else {   // 没有被关闭,则如果存在阻塞的等待发送协程,直接进行拷贝数据
		// Just found waiting sender with not closed.
		if sg := c.sendq.dequeue(); sg != nil {
			// Found a waiting sender. If buffer is size 0, receive value
			// directly from sender. Otherwise, receive from head of queue
			// and add sender's value to the tail of the queue (both map to
			// the same buffer slot because the queue is full).
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}

	if c.qcount > 0 {   // 有数据,注意:此时信道也可能处于关闭状态
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)  // 拷贝数据
		}
		typedmemclr(c.elemtype, qp)   // 清零循环队列的对应位置
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
    // 到了这里,说明信道为空,非阻塞模式直接返回,阻塞模式需要waiting

	if !block {   
		unlock(&c.lock)
		return false, false
	}

	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)   // 加入等待接收协程队列
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)   // 阻塞自身

    // 被唤醒
	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

// recv processes a receive operation on a full channel c.
// There are 2 parts:
//  1. The value sent by the sender sg is put into the channel
//     and the sender is woken up to go on its merry way.
//  2. The value received by the receiver (the current G) is
//     written to ep.
//
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {   // 无缓冲队列(同步队列),直接将等待发送协程的数据拷贝过来
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {   // 有缓冲队列,异步队列
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
        // 此时不能直接拷贝等待发送协程的数据,所以先读信道,然后将等待发送协程的数据拷贝到信道
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
		}
		// copy data from queue to receiver
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)   // 缓存等待发送协程
}

从信道中接收数据的过程和发送数据的过程是相似的,总结一下:

  1. 首先是检查信道是否为nil(没有初始化),如果为nil,如果是非阻塞模式,则直接返回false,否则panic(用户使用的信道接收都是阻塞的)
  2. 如果处于非阻塞模式下,此时信道为空(无缓冲信道的为空指得是没有等待发送协程队列),则直接返回false表示发送失败,这样做的好处是避免加锁的开销,尽可能减少加锁范围内的代码量
  3. 加锁,再次检查信道是否已经被关闭,如果被关闭且信道中没有数据,则返回零值数据(如果信道被关闭但是缓冲里还有数据会被后面从循环队列中正常拷贝数据)
  4. 如果等待发送协程队列非空,说明此时信道的状态是满的,此时会直接从循环队列中拷贝出一个数据,然后再将一个等待发送协程的数据拷贝到循环队列中,并且将该协程唤醒,释放锁返回
  5. 如果判断信道状态非空(有缓冲),则从循环队列中拷贝一份数据,释放锁返回
  6. 如果为空,如果是非阻塞模式,则直接返回false,否则将自身状态更新为阻塞,加入该信道的等待接收队列中
  7. 接下来的代码执行的话,也就是说被唤醒了,此时正常返回

关闭操作

func closechan(c *hchan) {
	if c == nil {   // 关闭一个nil信道会panic
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)   // 加锁
	if c.closed != 0 {   // 关闭一个已经被关闭的信道会panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}

	c.closed = 1   // 置关闭标志位为已经关闭

	var glist gList
    // 唤醒所有等待发送或者等待接收的队列
	// release all readers
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {   
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
    // 唤醒等待发送队列,等待这些协程的是直接panic
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

关闭信道的操作相对简单,总结如下:

  1. 检查信道是否为nil,,为nil则直接panic
  2. 加锁,检查信道是否已经被关闭,如果是,则panic
  3. 唤醒所有等待接收协程,唤醒所有等待发送协程(被唤醒后会panic)

select 操作

如果把信道作为一种io的话,那么select相当于是一种io多路复用机制,同时监听多个阻塞的信道。其实现原理正是通过非阻塞的对于信道的发送接收来实现的,对于每一个case,会被编译器编译为一个if-else结构,然后循环地遍历每一个信道,用非阻塞的模式尝试去读还是写,其实现代码如下:

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	return chanrecv(c, elem, false)
}

同时可以看到,如果监听的某一个信道已经被关闭了,假设是读取操作,这个case也会命中,因为chanrecv函数返回的第一个参数会是true。

实验

字节面试常考的一道题目就是三个协程,分别打印A, B, C, 然后现在需要打印出ABCABCABC。实现代码如下,尽可能用有缓冲的队列,无缓冲队列很容易死锁,而golang语言自带死锁检测机制,检测到死锁会直接panic。

import (
	"sync"
	"fmt"
)

func main() {
	var wg sync.WaitGroup
	wg.Add(3)
	AChannel := make(chan struct{}, 1)
	BChannel := make(chan struct{}, 1)
	CChannel := make(chan struct{}, 1)
	go func() {
		defer wg.Done()
		for i := 0; i < 3; i++ {
			<- AChannel
			fmt.Print("A")
			BChannel <- struct{}{}
		}
	}()
	go func() {
		defer wg.Done()
		for i := 0; i < 3; i++ {
			<- BChannel
			fmt.Print("B")
			CChannel <- struct{}{}
		}
	}()

	go func() {
		defer wg.Done()
		for i := 0; i < 3; i++ {
			<- CChannel
			fmt.Print("C")
			AChannel <- struct{}{} 
		}
	}()
	AChannel <- struct{}{}
	wg.Wait()
}

参考

  • Go 程序员面试笔试宝典

http://www.kler.cn/a/409343.html

相关文章:

  • 使用 OpenCV 进行视频中的行人检测
  • [ubuntu]编译共享内存读取出现read.c:(.text+0x1a): undefined reference to `shm_open‘问题解决方案
  • 详细探索xinput1_3.dll:功能、问题与xinput1_3.dll丢失的解决方案
  • Linux各种并发服务器优缺点
  • 汇编语言基础
  • 《筑牢安全防线:培养 C++安全编程思维习惯之道》
  • 《数据结构》学习系列——图(中)
  • 基于 BP 神经网络整定的 PID 控制
  • 根据气候变化自动制定鲜花存储策略(BabyAGI)
  • MCSA --- make coding simple again
  • C#里怎么样实现多播委托?
  • AIGC-------AIGC与创意写作:威胁还是机遇?
  • [webgis 0基础到找工作]------JavaScript进阶--作用域,解构,函数 day14
  • [webgis 0基础到找工作]------JavaScript--Bom day12
  • 地平线 bev_cft_efficientnetb3 参考算法-v1.2.1
  • 如何进行高级红队测试:OpenAI的实践与方法
  • HTTPSOK ---助力阿里云免费 SSL 证书自动续期
  • 废品买卖回收管理系统|Java|SSM|Vue| 前后端分离
  • Jmeter中的定时器
  • 基于STM32F103的FreeRTOS系列(十四)·软件定时器
  • 【Excel】拆分多个sheet,为单一表格
  • 微调Helsinki-NLP-en-zh模型
  • Python爬虫:如何从1688阿里巴巴获取公司信息
  • RTVS视频服务应用
  • [Golang]传递一个切片(slice)和使用变参(...)语法传递多个参数之间的区别
  • 力扣第 62 题(Unique Paths)两种递归实现