【Go 基础】channel
Go 基础
channel
什么是channel,为什么它可以做到线程安全
Go 的设计思想就是:不要通过共享内存来通信,而是通过通信来共享内存。 前者就是传统的加锁,后者就是 channel。也即,channel 的主要目的就是在多任务间传递数据的,本身就是安全的。
- channel 是 Go 中的一个核心类型,它可以看作是一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯。
- channel 也可以理解为一个先进先出队列,通过管道进行通信;
- 发送一个数据到 channel 和从 channel 中接收一个数据都是原子性的;
channel的生命周期状态有哪些
channel 存在 3 种状态:
- nil:未初始化的状态,只进行了声明,或者手动赋值为 nil
- active:正常的 channel,可以进行读写;
- closed:已关闭,注意已关闭的 channel,它的值也不是 nil
针对不同状态的 channel,进行关闭,发送数据以及接收数据,会有不同的情况:
操作 | 一个零值 nil 通道 | 一个非零值但已关闭通道 | 一个非零值且未关闭通道 |
---|---|---|---|
关闭 | 产生恐慌 | 产生恐慌 | 成功关闭 |
发送数据 | 永久阻塞 | 产生恐慌 | 阻塞或者成功发送 |
接收数据 | 永久阻塞 | 永不阻塞(会立即返回零值) | 阻塞或者成功接收 |
channel 的类型
channel 通常有以下三种类型:
- 同步 channel:不需要缓冲区,发送方会直接将数据交给接收方;
- 异步 channel:基于环形缓存的传统生产者消费者模型;
- chan struct{}:这是专门用于协程间通信的标准信号,因为 struct{} 不占用内存空间,所以用的比较多;
Goroutine 和 channel 的作用分别是什么
这里可以先简单说下,进程、线程以及协程之间的关系。
进程是内存资源和 CPU 调度的执行单元。为了有效利用多核处理器的优势,将进程进一步细分,允许一个进程中存在多个线程,这多个线程还是共享同一片内存空间,但 CPU 调度的最小单元变成了线程。
而协程,可以看作是轻量级线程。但是,和线程不同的是,线程的切换是由操作系统控制的,而协程的切换是由用户控制的。
Go 中的 Goroutine 就是协程,可以实现并行,多个协程可以在多个处理器同时跑。而协程同一时刻只能在一个处理器上跑。多个 Goroutine 之间的通信就是通过 channel,而协程的通信是通过 yield 和 resume() 操作。
在 Golang 中 channel 是 goroutinues 之间进⾏通信的渠道。
可以把 channel 形象⽐喻为⼯⼚⾥的传送带,⼀头的⽣产者 goroutine 往传输带放东⻄,另⼀头的消费者 goroutinue 则从输送带取东⻄。channel 实际上是⼀个有类型的消息队列,遵循先进先出的特点。
goroutine 的使用
只需要在函数的调用前面加 go 关键字,就可以启动协程了:
func main() {
for i:=1;i<5;i++ {
go func(i int) {
fmt.Println(i)
}(i)
}
// 停歇5s,保证打印全部结束
time.Sleep(5*time.Second)
}
上面的代码中,启动了 5 个 goroutine,再加上 main 函数的主 goroutine,一共 6 个 goroutine。由于 goroutine 类似于“守护线程”,是异步执行的。如果主 goroutine 不等待,程序可能就不会有打印输出了。
channel 的使用
-
channel 的操作符号
ch <- data 表示 data 被发送给 channel ch ;
data <- ch 表示从 channel ch 取⼀个值,然后赋给 data;
-
阻塞式 channel
channel 默认是没有缓冲区的,也即,通信是阻塞的。send 操作必须等到有消费者 accept 才算完成。
func main() { ch1 := make(chan int) go pump(ch1) // pump hangs fmt.Println(<-ch1) // prints only 1 } func pump(ch chan int) { for i := 1; ; i++ { ch <- i } }
在函数 pump() ⾥的 channel 在接受到第⼀个元素后就被阻塞了,直到主 goroutinue 取⾛了数据。最终channel 阻塞在接受第⼆个元素,程序只打印 1。
没有缓冲的 channel 只能容纳⼀个元素,⽽带有缓冲 channel 则可以⾮阻塞容纳 N 个元素。发送数据到缓冲 channel 不会被阻塞,除⾮channel已满;同样的,从缓冲 channel 取数据也不会被阻塞,除⾮ channel 空了。
Go 中 channel 的实现
前文其实就一直有提到了:channel 是 Go 中 goroutines 之间的信息传递媒介,通过共享通信,来实现共享内存。
goroutine 通过使⽤ channel 传递数据,⼀个会向 Channel 中发送数据,另⼀个会从 Channel 中接收数据,它们两者能够独⽴运⾏并不存在直接关联,但是能通过 Channel 间接完成通信。
channel 的收发操作均遵循来先进先出的设计:
- 先从 channel 读取数据的 goroutine 会先接收到数据
- 先往 channel 发送数据的 goroutine 会得到先发送数据的权利
channel 在 runtime 中的具体实现
在 runtime.hchan 中定义了 channel:
type hchan struct {
qcount uint // 当前队列⾥还剩余元素个数
dataqsiz uint // 环形队列⻓度,即缓冲区的⼤⼩,即make(chan T,N)中的N
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的⼤⼩
closed uint32 // 标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调⽤close将其设置为1,通道关闭
elemtype *_type // 元素类型,⽤于数据传递过程中的赋值
sendx uint // 环形缓冲区的状态字段,它只是缓冲区的当前索引-⽀持数组,它可以从中发送数据
recvx uint // 环形缓冲区的状态字段,它只是缓冲区当前索引-⽀持数 组,它可以从中接受数据
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex // 互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作
}
type waitq struct {
first *sudog
last *sudog
}
其中,hchan 结构体中有五个字段是构建底层的循环队列:
- qcount:channel 中剩余元素的个数
- dataqsiz:channel 中循环队列的长度
- buf:channel 的缓冲区数据指针
- sendx:channel 的发送操作处理到的位置
- recvx:channel 的接收操作处理到的位置
elemsize 和 elemtype 分别表示当前 channel 能够收发的元素类型和大小。
sendq 和 recvq 存储了当前 channel 由于缓冲区不足而阻塞的 goroutine 列表,这些等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构。
waitq 表示一个在等待队列中的 goroutine,该结构体存储了阻塞的相关信息以及两个分别指向前后 runtime.sudog 的指针。
创建 channel
runtime.makechan 和 runtime.makechan64 会根据传入的参数类型和缓冲区大小创建一个新的 channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况。
我们以 makechan 函数为例:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
channel 中根据收发元素的类型和缓冲区的大小初始化 runtime.hchan 结构题和缓冲区:
arena 区域就是我们所谓的堆区,Go 动态分配的内存都是在这个区域,它把内存分割成 8KB 大小的页,一些页组合起来称为 mspan。
bitmap 区域标识 arena 区域哪些地址保存了对象,并且用 4bit 标志位表示对象是否包含指针、GC 标记信息。bitmap 中一个 byte 大小的内存对应 arena 区域中 4 个指针大小(指针大小为 8B)的内存,所以 bitmap 区域的大小是 512GB/(4*8B)=16GB。
此外,我们还可以看到 bitmap 的高地址部分指向 arena 区域的低地址部分,这里 bitmap 的地址是由高地址向低地址增长的。
spans 区域存放 mspan(是一些 arena 分割的页组合起来的内存管理基本单元)的指针,每个指针对应一页,所以 spans 区域的大小就是 512GB/8KB*8B=512MB。
除以 8KB 是计算 arena 区域的页数,而最后乘以 8 是计算 spans 区域所有指针的大小。创建 mspan 的时候,按页填充对应的 spans 区域,在回收 object 时,根据地址很容易就能找到它所属的 mspan。