Go channel关闭方法
channel关闭原则
1、不能在消费端关闭channel(基础原则,单生产者或多生产者均不能在消费端关闭);
2、多个生产者时,不能对channel执行关闭;
3、只有在唯一或最后唯一剩下的生产者协程中关闭channel,之后通知消费者
没有值可以读,才能确保向一个已经关闭的channel中不再发送数据;
暴力关闭channel
强行在消费端或多个生产者端关闭channel会产生pannic,可以使用recover机制接收异常避免崩溃;
生产端:
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
// The return result can be altered
// in a defer function call.
closed = true
}
}()
ch <- value // panic if ch is closed
return false // <=> closed = false; return
}
消费端:
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()
// assume ch != nil here.
close(ch) // panic if ch is closed
return true // <=> justClosed = true; return
}
不同生产者消费者关闭channel情况
1.单生产者单(多)消费者
直接在生产者端close();
2.多生产者单消费者
不能在生产者端直接close(),需要新建一个信号channel,通知发送端停止发送数据;channel在没有go协程引用时会自动关闭,不用显式关闭;
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.
// stopCh为添加的信号channel,它的数据来源是dataCh的接受端发出的数据,它的数据
// 是在dataCh的生产端进行消费;
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
// The first select here is to try to exit the goroutine
// as early as possible. In fact, it is not essential
// for this example, so it can be omitted.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first branch in the
// second select may be still not selected for some
// loops if the send to dataCh is also unblocked.
// But this is acceptable, so the first select
// can be omitted.
// ?对于某些loop,dataCh的生产端塞入数据,即使stopCh已经关闭,第二个select的
// 第一个分支仍然可能不能被选择(select如果多个条件同时满足条件,会随机选择);
select {
case <- stopCh:
return
case dataCh <- rand.Intn(MaxRandomNumber):
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == MaxRandomNumber-1 {
// The receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
3.多生产者多消费者
不能让接受端或发送端关闭channel,甚至都不能让接受者关闭一个退出信号来通知生产者停止生产,因为多消费者会导致接受者多次执行close(),相当于多个生产端关闭channel,违反了channel关闭原则,但可以引入一个额外的协调者来关闭附加的退出信号channel。
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// The channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.
var stoppedBy string
// moderator
go func() {
stoppedBy = <- toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// Here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// The first select here is to try to exit the goroutine
// as early as possible. This select blocks with one
// receive operation case and one default branches will
// be optimized as a try-receive operation by the
// official Go compiler.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first branch in the
// second select may be still not selected for some
// loops (and for ever in theory) if the send to
// dataCh is also unblocked.
// This is why the first select block is needed.
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// Same as the sender goroutine, the first select here
// is to try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first branch in the
// second select may be still not selected for some
// loops (and for ever in theory) if the receive from
// dataCh is also unblocked.
// This is why the first select block is needed.
select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// The same trick is used to notify
// the moderator to close the
// additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
Context结束多个协程
package main
import (
"context"
"fmt"
"sync"
"time"
)
func worker(ctx context.Context, id int, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d canceled\n", id)
return
default:
// 执行协程的工作任务
fmt.Printf("Worker %d working\n", id)
time.Sleep(time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// 启动多个协程
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(ctx, i, &wg)
}
// 主程序等待一段时间后取消所有协程
time.Sleep(time.Second * 3)
cancel()
// 等待所有协程完成
wg.Wait()
fmt.Println("Main program finished")
}