当前位置: 首页 > article >正文

Go channel关闭方法

channel关闭原则

1、不能在消费端关闭channel(基础原则,单生产者或多生产者均不能在消费端关闭);

2、多个生产者时,不能对channel执行关闭;

3、只有在唯一或最后唯一剩下的生产者协程中关闭channel,之后通知消费者
   没有值可以读,才能确保向一个已经关闭的channel中不再发送数据;

暴力关闭channel

强行在消费端或多个生产者端关闭channel会产生pannic,可以使用recover机制接收异常避免崩溃;

生产端:
func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
        if recover() != nil {
            // The return result can be altered 
            // in a defer function call.
            closed = true
        }
    }()
    
    ch <- value // panic if ch is closed
    return false // <=> closed = false; return
}

消费端:
func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            justClosed = false
        }
    }()
    
    // assume ch != nil here.
    close(ch) // panic if ch is closed
    return true // <=> justClosed = true; return
}

不同生产者消费者关闭channel情况

1.单生产者单(多)消费者

直接在生产者端close();

2.多生产者单消费者

不能在生产者端直接close(),需要新建一个信号channel,通知发送端停止发送数据;channel在没有go协程引用时会自动关闭,不用显式关闭;

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
	// stopCh is an additional signal channel.
	// Its sender is the receiver of channel dataCh.
	// Its reveivers are the senders of channel dataCh.
    // stopCh为添加的信号channel,它的数据来源是dataCh的接受端发出的数据,它的数据    
    // 是在dataCh的生产端进行消费;
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// The first select here is to try to exit the goroutine
				// as early as possible. In fact, it is not essential
				// for this example, so it can be omitted.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops if the send to dataCh is also unblocked.
				// But this is acceptable, so the first select
				// can be omitted.
                
                // ?对于某些loop,dataCh的生产端塞入数据,即使stopCh已经关闭,第二个select的           
                // 第一个分支仍然可能不能被选择(select如果多个条件同时满足条件,会随机选择);
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(MaxRandomNumber):
				}
			}
		}()
	}
	
	// the receiver
	go func() {
		defer wgReceivers.Done()
		
		for value := range dataCh {
			if value == MaxRandomNumber-1 {
				// The receiver of the dataCh channel is
				// also the sender of the stopCh cahnnel.
				// It is safe to close the stop channel here.
				close(stopCh)
				return
			}
			
			log.Println(value)
		}
	}()
	
	// ...
	wgReceivers.Wait()
}
3.多生产者多消费者

不能让接受端或发送端关闭channel,甚至都不能让接受者关闭一个退出信号来通知生产者停止生产,因为多消费者会导致接受者多次执行close(),相当于多个生产端关闭channel,违反了channel关闭原则,但可以引入一个额外的协调者来关闭附加的退出信号channel。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumReceivers = 10
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh is an additional signal channel.
		// Its sender is the moderator goroutine shown below.
		// Its reveivers are all senders and receivers of dataCh.
	toStop := make(chan string, 1)
		// The channel toStop is used to notify the moderator
		// to close the additional signal channel (stopCh).
		// Its senders are any senders and receivers of dataCh.
		// Its reveiver is the moderator goroutine shown below.
	
	var stoppedBy string
	
	// moderator
	go func() {
		stoppedBy = <- toStop
		close(stopCh)
	}()
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(MaxRandomNumber)
				if value == 0 {
					// Here, a trick is used to notify the moderator
					// to close the additional signal channel.
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}
				
				// The first select here is to try to exit the goroutine
				// as early as possible. This select blocks with one
				// receive operation case and one default branches will
				// be optimized as a try-receive operation by the
				// official Go compiler.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the send to
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}
	
	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()
			
			for {
				// Same as the sender goroutine, the first select here
				// is to try to exit the goroutine as early as possible.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the receive from
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == MaxRandomNumber-1 {
						// The same trick is used to notify
						// the moderator to close the
						// additional signal channel.
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}
					
					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}
	
	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}
Context结束多个协程
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func worker(ctx context.Context, id int, wg *sync.WaitGroup) {
	defer wg.Done()

	for {
		select {
		case <-ctx.Done():
			fmt.Printf("Worker %d canceled\n", id)
			return
		default:
			// 执行协程的工作任务
			fmt.Printf("Worker %d working\n", id)
			time.Sleep(time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	var wg sync.WaitGroup

	// 启动多个协程
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go worker(ctx, i, &wg)
	}

	// 主程序等待一段时间后取消所有协程
	time.Sleep(time.Second * 3)
	cancel()

	// 等待所有协程完成
	wg.Wait()

	fmt.Println("Main program finished")
}


http://www.kler.cn/a/512302.html

相关文章:

  • @LoadBalanced注解的实现原理
  • PHP语言的语法糖
  • 集合帖:前缀和及差分模板题 ← “洛谷 P5638:光骓者的荣耀” + “洛谷 P3397:地毯”
  • AUTOSAR从入门到精通-线控底盘技术
  • ipmitool设置带外账号权限
  • 深入探索 Vue.js 组件开发中的最新技术:Teleport 和 Suspense 的使用
  • JAVA-IO模型的理解(BIO、NIO)
  • 在VSCode中使用Jupyter Notebook
  • Centos 8 交换空间管理
  • LeetCodeHOT100:60. n个骰子的点数、4. 寻找两个正序数组的中位数
  • 以“智慧建造”为理念,综合应用云、大、物、移、智等数字化技术的智慧工地云平台源码
  • 愿景是什么?
  • JSON-stringify和parse
  • 48V电气架构全面科普和解析:下一代智能电动汽车核心驱动
  • Android 空包签名(详细版)
  • AI刷题-病毒在封闭空间中的传播时间
  • 企业级流程架构设计思路-基于价值链的流程架构
  • 数据结构(二)栈/队列和二叉树/堆
  • centos虚拟机异常关闭,导致数据出现问题
  • 【2024年度个人生活与博客事业的融合与平衡总结】
  • 深入解析 C++17 中的 u8 字符字面量:提升 Unicode 处理能力
  • 大模型LLM-微调 RAG
  • Java-数据结构-二叉树习题(1)
  • AUTOSAR OS模块详解(三) Alarm
  • 我想通过python语言,学习数据结构和算法该如何入手?
  • 低代码运维与管理服务