当前位置: 首页 > article >正文

【Go底层】select原理

目录

  • 1、背景
  • 2、go版本
  • 3、 selectgo函数解释
    • 【1】函数参数解释
    • 【2】函数具体解释
      • 第一步:遍历pollorder,选出准备好的case
      • 第二步:将当前goroutine放到所有case通道中对应的收发队列上
      • 第三步:唤醒groutine
  • 4、总结

1、背景

select多路复用在go的异步和并发控制场景中非常好用,对于无case和只有单个case的情况,编译器在编译的时候就会对其做优化,无case就相当于调用了一个阻塞函数,单个case就相当于对一个通道进行读写操作,如果单个case中有default分支时,就相当于是一个if else逻辑,对于多个case的情况,是在运行时调用selectgo函数决定的,接下来我们就来研究一下selectgo函数。

2、go版本

$ go version
go version go1.21.4 windows/386

3、 selectgo函数解释

【1】函数参数解释

selectgo函数位于:src/runtime/select.go中,定义如下:

//cas0:case数组地址,按照往通道写数据在前,从通道读数据在后的排列顺序(编译时编译器优化行为操作的)
//nsends:往通道写数据的case数量
//nrecvs:从通道读数据的case数量
//block:是否阻塞
//返回值分别代表选中规定case位置和是否成功从通道接收数据,如果选中的是default,第一个返回值就返回-1
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

select中每一个case都对应一个scase结构,定义如下:

type scase struct {
	c    *hchan         //case对应的读或写通道
	elem unsafe.Pointer //指向要写入元素或存放读取元素的地址
}

【2】函数具体解释

selectgo函数中会遍历所有的case,为确保遍历case的随机性和安全性,有两个关键的顺序:pollorder和lockorder,不用关心其具体实现,明白其的作用就行。

pollorder:随机的case顺序,确保公平的处理每一个case。
lockorder:加锁的case顺序,确保并发安全。

计算出pollorder和lockorder顺序之后,会根据这2个顺序进行遍历分为了3步。

第一步:遍历pollorder,选出准备好的case

第一部分的代码如下:

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	...
	
	var casi int   //准备好的case位置
	var cas *scase //case对象
	var caseSuccess bool
	var caseReleaseTime int64 = -1
	var recvOK bool //如果是从通道读取数据,是否读取成功
	for _, casei := range pollorder { //遍历随机顺序的case
		casi = int(casei)   //case的位置
		cas = &scases[casi] //case对象
		c = cas.c //case通道

		if casi >= nsends { //前面讲过,写通道在前,读通道在后,所以这里是读通道case
			sg = c.sendq.dequeue() //取出往读通道写数据的协程队列中的第一个协程
			if sg != nil { //如果存在往通道写数据的协程
				goto recv  //从往通道写数据的协程中读取数据并返回case位置和读取结果
			}
			if c.qcount > 0 { //如果缓冲区还有数据
				goto bufrecv  //从缓冲区读取数据并返回case位置和读取结果
			}
			if c.closed != 0 { //如果通道已关闭
				goto rclose    //释放相关资源
			}
		} else { //写通道的case
			if raceenabled {
				racereadpc(c.raceaddr(), casePC(casi), chansendpc)
			}
			if c.closed != 0 { //如果通道已经关闭
				goto sclose    //直接panic
			}
			sg = c.recvq.dequeue() //从正在往通道读数据的协程队列中取得第一个
			if sg != nil { //如果往通道读数据的协程存在
				goto send  //发送数据到读通道的协程
			}
			if c.qcount < c.dataqsiz { //缓冲区还有位置
				goto bufsend
			}
		}
	}

	if !block { //如果不阻塞,也就是带default分支
		selunlock(scases, lockorder)
		casi = -1 //case位置为-1
		goto retc //直接返回,不用进入下一步
	}
	
	...
}

bufrecv标签:

	bufrecv:
	recvOK = true  //返回读数据成功
	qp = chanbuf(c, c.recvx) //缓冲区中要读取数据的地址
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp) //将读取的缓冲区数据拷贝到case中的elem位置
	}
	typedmemclr(c.elemtype, qp) //清理缓冲区被读的数据
	c.recvx++ //读取缓冲区的位置+1
	if c.recvx == c.dataqsiz { //下一个要读取缓冲区的位置如果等于缓冲区大小就将下次要读取的缓冲区位置置为0
		c.recvx = 0
	}
	c.qcount-- //缓冲区中元素个数-1
	selunlock(scases, lockorder)
	goto retc

bufsend标签:

	bufsend:
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) //将case中要写入的元素写到缓冲区
	c.sendx++ //写入缓冲区的位置+1
	if c.sendx == c.dataqsiz { //如果下次要写入缓冲区的位置等于缓冲区的大小就将缓冲区写入位置置为开头
		c.sendx = 0
	}
	c.qcount++ //缓冲区元素数量+1
	selunlock(scases, lockorder)
	goto retc	

recv标签:

recv:
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //从写通道的协程读取数据
	if debugSelect {
		print("syncrecv: cas0=", cas0, " c=", c, "\n")
	}
	recvOK = true //返回成功读取
	goto retc

rclose标签:

rclose:
	selunlock(scases, lockorder)
	recvOK = false //从通道中读取数据失败
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem) //释放case中元素的空间
	}
	if raceenabled {
		raceacquire(c.raceaddr())
	}
	goto retc

send标签:

send:
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //发送数据到往通道读数据的协程
	if debugSelect {
		print("syncsend: cas0=", cas0, " c=", c, "\n")
	}
	goto retc

retc标签:

retc:
	if caseReleaseTime > 0 {
		blockevent(caseReleaseTime-t0, 1)
	}
	return casi, recvOK  //返回case位置和是否从通道成功读取数据

sclose标签:

sclose:
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))

上面就是selectgo函数第一部分的逻辑,第一部分就是遍历一个随机的case顺序,如果有符合条件的case就返回case的位置并且返回读数据的结果,如果没有case符合条件但是有default分支就返回-1,如果没default分支就进入下一步。

第二步:将当前goroutine放到所有case通道中对应的收发队列上

第二部分的代码如下:

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	...

	gp = getg() //获取当前协程
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	nextp = &gp.waiting
	for _, casei := range lockorder { //按照对case加锁的顺序遍历case
		casi = int(casei)   //case的位置
		cas = &scases[casi] //case对象
		c = cas.c  //case对象中的通道
		sg := acquireSudog() //初始化一个协程等待结构
		sg.g = gp //协程等待结构绑定协程
		sg.isSelect = true //表示该协程等待结构与select操作相关
		sg.elem = cas.elem 
		sg.releasetime = 0
		if t0 != 0 {
			sg.releasetime = -1
		}
		sg.c = c
		*nextp = sg
		nextp = &sg.waitlink

		if casi < nsends { //如果case上是往通道写数据,就将绑定当前协程的等待对象插入当前case通道的发送队列中
			c.sendq.enqueue(sg) 
		} else { //如果case上是往通道读数据,就将绑定当前协程的等待对象插入当前case通道的接收队列中
			c.recvq.enqueue(sg)
		}
	}

	...
}

第二部分就是将当前协程放到每个case中的通道对应的收发队列中去。

第三步:唤醒groutine

第三部分代码如下:

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	...

	sg = (*sudog)(gp.param) //被唤醒的协程等待结构
	gp.param = nil
	
	casi = -1  //case位置
	cas = nil  //case对象
	caseSuccess = false
	sglist = gp.waiting //lockorder顺序的协程等待结构队列,这里是队列中的第一个协程等待结构
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { //清空协程等待结构队列中元素便于进行垃圾回收
		sg1.isSelect = false
		sg1.elem = nil
		sg1.c = nil
	}
	gp.waiting = nil

	for _, casei := range lockorder { //根据对case的加锁顺序进行遍历
		k = &scases[casei] //当前case
		if sg == sglist {  //唤醒的协程等待结构是当前case的
			casi = int(casei) //唤醒的case位置
			cas = k //唤醒的case对象
			caseSuccess = sglist.success //往通道读取或写数据结果
			if sglist.releasetime > 0 {
				caseReleaseTime = sglist.releasetime
			}
		} else { //唤醒的协程等待结构不是当前case的
			c = k.c
			if int(casei) < nsends { //case为发送通道,就是释放当前case通道里sendq队列的协程等待结构对象
				c.sendq.dequeueSudoG(sglist)
			} else {  //case为读取通道,就是释放当前case通道里recvq队列的协程等待结构对象
				c.recvq.dequeueSudoG(sglist)
			}
		}
		sgnext = sglist.waitlink //下一个协程等待结构
		sglist.waitlink = nil
		releaseSudog(sglist) //释放上一个协程等待结构
		sglist = sgnext
	}

	...
}

第三部分就是某一个case上的协程等待结构被唤醒时,会先执行通道上对应的收发操作, 然后去将所有case上的协程等待结构释放掉。

4、总结

select虽然使用起来简单,但其实现逻辑还是比较复杂的,通过熟悉其实现,我们能理解对多个通道进行操作时候,可以为每一个通道创建一个协程去操作,这无疑增加了GC开销,但是使用select采用了多路复用的思想,将一个协程绑定在多个协程等待对象上,而且对case使用了随机顺序,确保每一个case都能公平的被执行。


http://www.kler.cn/a/421541.html

相关文章:

  • 模拟实现单链表 —— SingleLinkedList
  • 【Nativeshell】flutter的pc跨平台框架学习记录<二> 窗口间通信
  • 《以 C++为笔,绘就手势识别人机交互新画卷》
  • WEB开发: 丢掉包袱,拥抱ASP.NET CORE!
  • Maven、JAVAWeb、Servlet
  • VLC 播放的音视频数据处理流水线搭建
  • 自由学习记录(28)
  • 8 Bellman Ford算法SPFA
  • 全面解析Astra+深度相机模块:组件、功能与应用
  • 初次chronyd安装使用
  • Day 32 动态规划part01
  • 探索 SpringBoot 于 MVC 模式下高校办公室行政事务管理系统的设计与实现
  • 常见排序算法总结 (三) - 归并排序与归并分治
  • 网络安全防范技术
  • C语言基本知识2.6%g的用法
  • 【AI系统】LLVM 前端和优化层
  • 大数据新视界 -- 大数据大厂之 Hive 数据压缩:优化存储与传输的关键(上)(19/ 30)
  • Navicat连接SQL Server及SpringBoot连接SQL Server(jtds)
  • ESP32-S3模组上跑通ES8388(13)
  • Scala的模式匹配(6)
  • 【C++】LeetCode:LCR 026. 重排链表
  • Android 使用OpenGLES + MediaPlayer 获取视频截图
  • 华为服务器使用U盘重装系统
  • JavaScript(一)
  • 【开发语言】层次状态机(HSM)介绍
  • 【学习Go编程】