当前位置: 首页 > article >正文

Golang--协程和管道

1、概念

程序:
是为完成特定任务、用某种语言编写的一组指令的集合,是一段静态的代码。(程序是静态)

进程:
程序的一次执行过程。正在运行的一个程序,进程作为资源分配的单位,在内存中会为每个进程分配不同的内存区域,是一个动的过程 ,进程有它自身的产生、存在和消亡的过程。(进程是动态的)

线程:
进程可进一步细化为线程,是一个程序内部的一个执行路径。若一个进程同一时间并执行多个线程,就是支持多线程的。 
单核:并发执行
多核:并发执行和并行执行

协程:

又称为微线程,纤程,协程是一种用户态的轻量级线程

作用:在执行A函数的时候,可以随时中断,去执行B函数,然后中断继续执行A函数(可以自动切换),注意这一切换过程并不是函数调用(没有调用语句),过程很像多线程,然而协程中只有一个线程在执行(协程的本质是个单线程)

对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就将寄存器上下文和栈保存到某个其他地方,然后切换到另外一个任务去计算。在任务切回来的时候,恢复先前保存的寄存器上下文和栈,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而会更多得将cpu执行权限分配给我们的线程(线程是cpu控制的,而协程是程序自身控制的,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

2、启动一个协程 

案例:
编写一个程序,完成如下功能:

  1. 在主线程中,开启一个goroutine(协程),该goroutine每隔1秒输出"hello golang"
  2. 在主线程中也每隔一秒输出"hello world",输出10次后退出程序
  3. 要求主线程和goroutine同时执行
package main
import (
	"fmt"
	"strconv"
	"time"
)

func test(){
	for i := 1; i <= 10; i++{
		fmt.Println("hello golang" + strconv.Itoa(i))
		//阻塞一秒
		time.Sleep(time.Second * 1)// 1s
	}
}

func main(){   //主线程
	//开启一个协程
	go test()

	for i := 1; i <= 10; i++{
		fmt.Println("hello world" + strconv.Itoa(i))
		//阻塞一秒
		time.Sleep(time.Second * 1)// 1s
	}
}

开启协程:使用go关键字,例如:go test()

执行流程:
注意:如果主线程退出,协程还没执行结束,协程也会提前结束(主死从随)

3、启动多个协程

package main
import (
	"fmt"
	"strconv"
	"time"
)

func test1(){
	for i := 1; i <= 10; i++{
		fmt.Println("hello golang" + strconv.Itoa(i))
		//阻塞一秒
		time.Sleep(time.Second * 1)// 1s
	}
}

func main(){   //主线程

	//开启一个协程--使用匿名函数
	//匿名函数 + 外部变量 = 闭包
	for i := 1; i <= 5; i++{
		go func(n int){
			fmt.Println(n)
		}(i)
	}

	//开启一个协程--使用普通函数
	go test1()

	for i := 1; i <= 10; i++{
		fmt.Println("hello world" + strconv.Itoa(i))
		//阻塞一秒
		time.Sleep(time.Second * 1)// 1s
	}
}

4、使用WaitGroup控制协程退出

WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量,每个被等待的线程在结束时应调用Done方法。同时主线程里可以调用Wait方法阻塞至所有线程结束。用于解决主线程在子协程结束后自动结束,防止主线程退出导致协程被迫退出,


package main
import (
	"fmt"
	"sync" // 并发包
	"time"
	"strconv"
)

var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化--类似计数器

func test1(){
	//协程执行完毕,协程数量-1
	defer wg.Done()
	for i := 1; i <= 10; i++{
		fmt.Println("hello golang" + strconv.Itoa(i))
		//阻塞一秒
		time.Sleep(time.Second * 2)// 2s
	}
	// //协程执行完毕,协程数量-1
	// wg.Done()
}

func main(){   //主线程
	//开启一个协程--使用普通函数
	wg.Add(1)  // 协程数量+1
	go test1() // 开启协程
	//主线程一直阻塞,等待协程执行完毕--直到协程执行完毕才会继续执行主线程
	wg.Wait()
	
	for i := 1; i <= 10; i++{
		fmt.Println("hello world" + strconv.Itoa(i))
		//阻塞一秒
		time.Sleep(time.Second * 1)// 1s
	}
}

5、多个协程操纵同一数据案例(使用互斥锁同步协程

错误案例:

package main
import (
	"fmt"
	"sync" // 并发包
)

var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化

//定义一个变量:
var cnt int

func iAdd(){
	defer wg.Done()
	for i := 1; i <= 10000; i++{
		cnt = cnt + 1
	}
}
func iSub(){
	defer wg.Done()
	for i := 1; i <= 10000; i++{
		cnt = cnt - 1
	}
}

func main(){   //主线程
	//开启一个协程--使用普通函数
	wg.Add(2)  // 协程数量+2
	go iAdd() // 开启协程
	go iSub()
	wg.Wait()

	fmt.Println(cnt) //结果:理论上cnt是0,但实际上不是
}

两个协程进行的顺序是不一定的,比如取cnt这个值时,两个协程进行的取值和运算操作顺序的关系,可能导致对原本正确的结果进行了覆盖,导致iAdd函数的偏移量和iSub函数的偏移量之和不等于0,实际结果是个不确定的值。

5.1 互斥锁 

正确案例:使用互斥锁同步协程

解决上面问题的方案:
确保一个协程在执行逻辑的时候另外的协程不执行---->利用锁的机制---->互斥锁

互斥锁:
Mutex为互斥锁,Lock()加锁,Unlock()解锁,使用Lock()加锁后,便不能再次对其进行加锁,直到利用Unlock()解锁对其解锁后,才能再次加锁,适用于读写不确定场景,即读写次数没有明显的区别-----性能效率相对来说偏低

package main
import (
	"fmt"
	"sync" // 并发包
)

var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化
var lock sync.Mutex //加入互斥锁

//定义一个变量:
var cnt int

func iAdd(){
	defer wg.Done()
	for i := 1; i <= 10000; i++{
		lock.Lock() // 加锁
		cnt = cnt + 1
		lock.Unlock() // 解锁
	}
}
func iSub(){
	defer wg.Done()
	for i := 1; i <= 10000; i++{
		lock.Lock() // 加锁
		cnt = cnt - 1
		lock.Unlock() // 解锁

	}
}

func main(){   //主线程
	wg.Add(2) 
	go iAdd()
	go iSub()
	wg.Wait()

	fmt.Println(cnt) //结果:理论上cnt是0,但实际上不是
}


5.2 读写锁 

读写锁:

RWMutex是一个读写锁,其经常用于读次数远远多于写次数的场景。
在读的时候,数据之间不产生影响,写和读之间才会产生影响

package main
import (
	"fmt"
	"sync" // 并发包
	"time"
)

var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化
var lock sync.RWMutex //加入读写锁

func read(){
	defer wg.Done()
	//如果只是读取数据,那么这个锁不产生任何影响,但是如果是写入数据,那么就会产生阻塞
	lock.RLock()
	fmt.Println("尝试读取数据中...")
	time.Sleep(time.Second)
	fmt.Println("读取数据成功!")
	lock.RUnlock()
}
func write(){
	defer wg.Done()
	lock.Lock()
	fmt.Println("尝试修改数据中...")
	time.Sleep(time.Second * 2)
	fmt.Println("修改数据成功!")
	lock.Unlock()
}

func main(){   //主线程
	//场景:读多写少
	wg.Add(1)
	go write()
	for i := 1; i < 20; i++{
		wg.Add(1)
		go read()
	}
	wg.Wait()
}


的过程中,锁生效,但是读可以并发读,锁没有影响

6、管道

6.1 管道概念

管道:

  • 管道本质就是一个数据结构--队列
  • 数据特性:先进先出
  • 自身线程安全,多协程访问时,不需要加锁,channel本身就是线程安全的
  • 管道有类型的,如:一个string类型的管道只能存放string类型数据

6.2 管道的定义

定义:

var 变量名 chan 数据类型

  • chan是管道关键字
  • 数据类型指的是管道的类型,指里面放入数据的类型,int类型的管道只能写入整数int
  • 管道是引用类型,必须初始化才能写入数据,即make后才能使用
package main
import (
	"fmt"
)

func main(){
	//定义管道、声明管道 ---> 定义一个int类型的管道
	var intChan chan int
	//通过make初始化:管道可以存放3个int类型的数据
	intChan = make(chan int,3)

	//证明管道是引用类型
	fmt.Printf("intChan的值:%v\n",intChan) //intChan的值:0xc0000ac080

	//向管道中写入数据
	intChan <- 10
	intChan <- 20
	//输出管道的长度
	fmt.Printf("管道的实际长度:%v,管道的容量:%v\n",len(intChan),cap(intChan)) // 管道的实际长度:2,管道的容量:3

	//从管道中读取数据
	num := <-intChan
	fmt.Println(num) // 10

	//关闭管道---> 不能向关闭的管道中写入数据
	close(intChan)
	// intChan <- 100 // 报错:panic: send on closed channel
	num,flag := <-intChan
	fmt.Println(num,flag) // 20 true
}

6.3 管道的关闭

管道的关闭:
使用内置函数close可以关闭管道,当管道关闭后,就不能再向管道写数据了,但是仍然可以从该管道读取数据。


例子:上面代码的

6.4 管道的遍历

管道的遍历:

管道支持for-range的方式进行遍历,注意:

  • 在遍历时,如果管道没有关闭,则会出现deadlock(死锁)的错误
  • 在遍历时,如果管道已经关闭,则会正常遍历数据,遍历完后,就会退出遍历 
package main
import (
	"fmt"
)

func main(){
	//定义管道、声明管道 ---> 定义一个int类型的管道
	var intChan chan int = make(chan int,100)

	for i := 1; i <= 100; i++{
		intChan <- i
	}
	//遍历前,如果没有关闭管道,那么会出现死锁 -- deadlock
	close(intChan)
	//for-range读取管道中的数据
	for v := range intChan{
		fmt.Printf("%v ",v)
	}
}

6.5 协程和管道协同工作案例

案例需求:

请完成协程和管道协同工作的案例,具体要求:

  1. 开启一个writeData协程,向管道中写入50个整数
  2. 开启一个readData协程,从管道中读取writeData写入的数据
  3. 注意:writeData和readData操作的是同一个管道
  4. 主线程需要等待writeData和readData协程都完成工作才能退出
package main
import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化

func writeData(intChan chan int){
	defer wg.Done() // 协程执行完毕,协程数量-1
	for i := 1; i <= 50; i++{
		fmt.Printf("写入数据:%v\n",i)
		intChan<- i		
		time.Sleep(time.Second * 1)
	}
	close(intChan)
}

func readData(intChan chan int){
	defer wg.Done() // 协程执行完毕,协程数量-1
	for v := range intChan{
		fmt.Printf("读取数据:%v\n",v)
		time.Sleep(time.Second * 1)
	}
}

func main(){
	//init
	var intChan chan int = make(chan int,100)
	//开启线程
	wg.Add(2)
	go writeData(intChan)
	go readData(intChan)
	wg.Wait()
}

6.6 声明只读只写管道

管道可以声明为只读或者只写性质:

package main

import (
	"fmt"
)

func main(){
	//默认情况下,管道是双向的 --> 可读可写
	var intChan1 chan int = make(chan int, 10)
	intChan1 <- 10
	fmt.Println(intChan1)

	//声明为只写 --> 只能写,不能读
	var intChan2 chan <- int = make(chan int, 5)
	intChan2 <- 10
	fmt.Println(intChan2)
	//fmt.Println(<- intChan2) // 报错:invalid operation: <-intChan2 (receive from send-only type chan<- int)

	//声明为只读--> 只能读,不能写
	var intChan3 <- chan int = make(chan int, 5)
	fmt.Println(intChan3)
	//intChan3 <- 10 // 报错:invalid operation: intChan3 <- 10 (send to receive-only type <-chan int)
}

6.7 管道的阻塞

阻塞的情况:

package main
import (
	"fmt"
	"sync"
	//"time"
)

var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化

func writeData(intChan chan int){
	defer wg.Done() // 协程执行完毕,协程数量-1
	for i := 1; i <= 10; i++{
		fmt.Printf("写入数据:%v\n",i)
		intChan<- i		
		//time.Sleep(time.Second * 1)
	}
	close(intChan)
}

func readData(intChan chan int){
	defer wg.Done() // 协程执行完毕,协程数量-1
	for v := range intChan{
		fmt.Printf("读取数据:%v\n",v)
		//time.Sleep(time.Second * 1)
	}
}

func main(){
	//init
	var intChan chan int = make(chan int,5)
	//开启线程
	wg.Add(1)
	go writeData(intChan)
	//go readData(intChan)
	wg.Wait()
}
  • 在Go语言中,如果一个管道(channel)只被写入数据而没有被读取,那么最终会导致管道阻塞。这是因为Go语言中的管道是同步的,即写入操作会等待读取操作完成后才能继续如果没有读取操作来接收数据,写入操作就会一直等待,从而导致程序阻塞
  • 在代码片段中,如果writeData函数一直向intChan管道写入数据,而没有readData函数来读取这些数据,那么writeData函数中的intChan<- i操作会在管道缓冲区满后阻塞。如果缓冲区大小是5,那么在写入5个数据后,writeData函数就会阻塞,直到有读取操作来接收数据。
  • 如果程序中没有其他地方读取这个管道,那么writeData函数会一直阻塞,这可能会导致程序死锁。为了避免这种情况,通常应该确保在创建管道时,有相应的读取操作来处理写入的数据。
  • 在实际编程中,应该根据程序的逻辑和需求来设计管道的读写操作,确保数据能够被正确处理,避免出现阻塞或死锁的情况。如果确实需要一个只写而不读的管道,那么应该考虑使用其他机制来处理数据,或者在程序中添加适当的逻辑来处理这种情况。

确保有相应的读取操作(不管读写速度是否一致(例如读的速度小于写的速度),只要有读取操作就不会阻塞):

package main
import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup // 并发包的变量, 只定义无需初始化

func writeData(intChan chan int){
	defer wg.Done() // 协程执行完毕,协程数量-1
	for i := 1; i <= 10; i++{
		fmt.Printf("写入数据:%v\n",i)
		intChan<- i		
		time.Sleep(time.Second * 1)
	}
	close(intChan)
}

func readData(intChan chan int){
	defer wg.Done() // 协程执行完毕,协程数量-1
	for v := range intChan{
		fmt.Printf("读取数据:%v\n",v)
		time.Sleep(time.Second * 5)
	}
}

func main(){
	//init
	var intChan chan int = make(chan int,5)
	//开启线程
	wg.Add(2)
	go writeData(intChan)
	go readData(intChan)
	wg.Wait()
}

6.8 select功能

select功能:在Go语言中,select语句用于处理多个通道(channel)的操作。它类似于switch语句,但是专门用于处理通道的发送和接收操作select语句会监听多个通道的操作,一旦其中一个通道准备好进行发送或接收操作,select就会执行相应的分支。

在这个语法中,每个case分支对应一个通道操作。如果多个case分支同时满足条件,Go语言会随机选择一个分支执行如果没有任何分支满足条件,并且存在default分支,那么就会执行default分支。如果没有default分支,select语句会阻塞,直到有某个分支满足条件
 

  • case后面必须进行的是io操作,不能是等值,随机去选择一个io操作
  • default防止select被阻塞住,加入default
package main
import (
	"fmt"
	"time"
)


func main(){
	//chan int
	var intChan chan int = make(chan int, 5)
	go func(){
		time.Sleep(time.Second * 4)
		intChan <- 10
	}()
	//chan string
	var strChan chan string = make(chan string, 5)
	go func(){
		time.Sleep(time.Second * 2)
		strChan <- "hello world"
	}()
	//chan float32
	var floChan chan float32 = make(chan float32, 5)
	go func(){
		time.Sleep(time.Second * 1)
		floChan <- 1.1111
	}()

	//select--> 多路复用 --> 哪个管道有数据就执行哪个管道
	select{
		case v := <- intChan:
			fmt.Printf("select:读取intChan数据:%v\n",v)
		case v := <- strChan:
			fmt.Printf("select:读取strChan数据:%v\n",v)
		case <- floChan:
			fmt.Printf("floChan管道") //执行
		// default:
		// 	fmt.Println("防止select阻塞\n")
	}
}

6.9  defer + recover机制处理错误

问题原因:多个协程工作,其中一个协程出现panic,导致程序奔溃

解决方法:利用defer + recover捕获panic进行处理,即使协程出现问题,主线程仍然不受影响可以继续执行

package main
import (
	"fmt"
	"time"
)

//输出数字
func printNum(){
	for i := 1; i <= 10; i++{
		fmt.Println(i)
		//time.Sleep(time.Second * 1)
	}
}

//除法
func devide(){
	defer func(){
		err := recover()
		if err != nil{
			fmt.Println("devide()发生错误:",err)
		}
	}()
	num1 := 100000
	num2 := 0
	result := num1/num2
	fmt.Println(result)
}

func main(){
	go printNum()
	go devide()

	time.Sleep(time.Second * 5)
}


http://www.kler.cn/a/384532.html

相关文章:

  • 解决:ros进行gazebo仿真,rviz没有显示传感器数据
  • 飞腾平台Arm ComputeLibrary编译安装指南
  • java list使用基本操作
  • 数据结构---二叉树(顺序结构),堆(上)
  • 【LeetCode】【算法】287. 寻找重复数
  • 微信小程序开发,诗词鉴赏app,诗词推荐实现(二)
  • 飞腾平台Arm ComputeLibrary编译安装指南
  • canal1.1.7使用canal-adapter进行mysql同步数据
  • springboot 传统应用程序,适配云原生改造
  • SSH僵尸主机挖矿木马预警
  • 群控系统服务端开发模式-应用开发-腾讯云上传工厂及七牛云上传工厂开发
  • 从Apache Atlas到Aloudata BIG,数据血缘解析有何改变?
  • redis时间优化
  • Redisson实现RedLock分布式锁同步
  • git提交冲突的原因及解决方案
  • 如何搭建 ELK【elasticsearch+logstash+kibana】日志分析系统
  • ctfshow-web入门-反序列化(web260-web264)
  • uniapp+vue基于微信小程序的健康饮食推荐系统 907m6
  • Linux系统的文件系统和日志和管理
  • Debian 12环境里部署nginx步骤记录
  • ssh通过跳板机免密登陆
  • 代谢组数据分析(二十一):通过MetaboAnalystR标准化构建sPLSDA预测模型
  • 爬虫学习5
  • duxapp放弃了redux,在duxapp状态实现方案
  • WordPress伪静态设置
  • HTML 块级元素和内联(行内)元素详解