当前位置: 首页 > article >正文

Golang——并发控制

本文介绍Go并发,同步,顺序执行,设计的一些常见的场景,顺序执行主要用channel实现。在这种同步信号的使用场景中,使用无缓冲通道,可以选择不关闭通道。

在这里插入图片描述

文章目录

    • 协程同步背景介绍
      • 无缓冲通道的作用
      • 为什么不需要关闭通道
      • 何时需要关闭通道
      • 总结
    • 常见的同步场景
      • 顺序执行10个Goroutine
      • 两个 Goroutines 的交替执行,交替打印偶数和奇数
      • 两个 server 的任意一个执行完成,就执行第三个
      • 两个 server 必须全部执行完成,再执行第三个。
      • 三个 server 的必须按 1 2 3 顺序执行
      • 一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)
      • 一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)

协程同步背景介绍

无缓冲通道(unbuffered channel)通常用于同步 Goroutine 的执行顺序。它的作用是让 Goroutines 在指定的时刻按顺序执行,而不是用来传递数据。因此,在这种同步信号传递的场景中,不需要关闭通道,程序依然能够正常运行。

无缓冲通道的作用

无缓冲通道的特性是:

  • 发送方必须等待接收方接收数据,才能继续发送下一个数据。
  • 它本质上是一个同步机制,用于协调不同 Goroutine 的执行顺序

这种同步机制不涉及数据传输,只是信号传递。

为什么不需要关闭通道

  • 同步控制:在你的代码中,通道仅仅用于同步 Goroutine 的执行,而不用于传输实际的数据。在同步场景下,通道充当的是“信号”的角色。接收方只关心信号的到来,处理完后就继续执行,通道是否关闭不会影响这一行为。

  • 没有资源泄漏:由于通道没有存储任何数据,而是仅用于发送和接收信号,通道的关闭不会带来资源泄漏。即使不关闭通道,程序也会正确地运行,因为没有对通道进行进一步的发送操作。

  • 避免复杂性:关闭通道通常用于“通知接收方数据已经完成传输”,而在这种情况下,你只是在等待信号,不需要关心通道的关闭状态。因此,不关闭通道反而使得程序逻辑更简洁,避免了复杂的资源管理。

  • 垃圾回收机制close():Go 的垃圾回收机制会自动处理那些不再使用的对象和数据结构,包括通道。所以即使没有显式关闭通道,程序结束时,未关闭的通道也会被垃圾回收。

何时需要关闭通道

在其他情况下,关闭通道是必要的,尤其是在以下几种场景:

  • 多接收方模式:如果你有多个接收方从同一个通道接收数据,关闭通道可以通知所有接收方没有更多的数据可以接收。

  • 通知没有更多数据:如果通道用于传输数据,关闭通道可以标识发送方不再有数据发送,这对于避免接收方阻塞和避免无限等待是非常重要的。

总结

无缓冲通道 的同步信号传递场景下:

  • 不关闭通道是合理的,因为通道的作用仅仅是控制同步,而不是用于传输数据。
  • 通道的关闭通常是在传输数据时,通知接收方“没有更多数据了”,而在同步信号的场景中,这种关闭操作并不必要。

所以,在 顺序执行的同步场景 中,如果你只是通过无缓冲通道传递信号,不关闭通道完全没有问题。

常见的同步场景

顺序执行10个Goroutine

这里选择了关闭channel,其实可以删掉close(),后续的例子不会再关闭channel。

package main

import (
	"fmt"
	"time"
)

func main() {
	// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序
	steps := make([]chan struct{}, 10)
	for i := 0; i < 10; i++ {
		steps[i] = make(chan struct{})
	}

	// 定义 10 个 Goroutine,按照顺序执行
	for i := 0; i < 10; i++ {
		go func(i int) {
			if i == 0 {
				// 第一个 Goroutine 不需要等待
				fmt.Println("Goroutine 1: Start")
				time.Sleep(1 * time.Second) // 模拟工作
				fmt.Println("Goroutine 1: Done")
				// 通知 Goroutine 2 可以开始
				steps[0] <- struct{}{}
				close(steps[0])
			} else {
				// 等待上一个 Goroutine 完成
				<-steps[i-1]
				fmt.Printf("Goroutine %d: Start\n", i+1)
				time.Sleep(1 * time.Second) // 模拟工作
				fmt.Printf("Goroutine %d: Done\n", i+1)
				// 通知下一个 Goroutine 可以开始
				if i < 9 {
					steps[i] <- struct{}{}
					close(steps[i])
				} else {
					// 最后一个 Goroutine,通知主线程完成
					steps[9] <- struct{}{}
					close(steps[9])
				}
			}
		}(i)
	}

	// 等待最后一个 Goroutine 完成
	<-steps[9]
	fmt.Println("All Goroutines Finished!")
}

输出:

Goroutine 1: Start
Goroutine 1: Done
Goroutine 2: Start
Goroutine 2: Done
Goroutine 3: Start
Goroutine 3: Done
Goroutine 4: Start
Goroutine 4: Done
Goroutine 5: Start
Goroutine 5: Done
Goroutine 6: Start
Goroutine 6: Done
Goroutine 7: Start
Goroutine 7: Done
Goroutine 8: Start
Goroutine 8: Done
Goroutine 9: Start
Goroutine 9: Done
Goroutine 10: Start
Goroutine 10: Done
All Goroutines Finished!

两个 Goroutines 的交替执行,交替打印偶数和奇数

package main

import (
	"fmt"
)

func main() {
	// 定义两个channel,分别用于控制打印顺序
	ch1 := make(chan struct{})
	ch2 := make(chan struct{})
	done := make(chan struct{}) // 用于通知主协程完成

	// 启动第一个goroutine,负责打印偶数
	go func() {
		for i := 0; i <= 9; i += 2 {
			<-ch1             // 等待信号
			fmt.Println(i)    // 打印当前偶数
			ch2 <- struct{}{} // 通知另一个goroutine
		}
	}()

	// 启动第二个goroutine,负责打印奇数
	go func() {
		for i := 1; i <= 9; i += 2 {
			<-ch2          // 等待信号
			fmt.Println(i) // 打印当前奇数
			if i == 9 {
				done <- struct{}{} // 打印完成通知主协程
			} else {
				ch1 <- struct{}{} // 通知另一个goroutine
			}
		}
	}()

	// 主协程启动打印过程
	ch1 <- struct{}{} // 先给ch1发送信号,开始打印偶数

	// 主协程等待所有任务完成
	<-done
}

输出:

0
1
2
3
4
5
6
7
8
9

两个 server 的任意一个执行完成,就执行第三个

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package main

import (
	"fmt"
	"time"
)

func server1(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server1"
}
func server2(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server2"
}
func main() {
	for i := 0; i < 5; i++ {
		output1 := make(chan string)
		output2 := make(chan string)
		go server1(output1)
		go server2(output2)
		select {
		case s1 := <-output1:
			fmt.Println(s1, "server3")
		case s2 := <-output2:
			fmt.Println(s2, "server3")
		}
		fmt.Println("--------------")
	}
	fmt.Println("5组执行完成")
}

输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。

server1 server3
--------------
server2 server3
--------------
server1 server3
--------------
server1 server3
--------------
server1 server3
--------------
5组执行完成

说明:
select如果加上default会直接命中default,不会等待两个通道。

两个 server 必须全部执行完成,再执行第三个。

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package main

import (
	"fmt"
	"time"
)

func server1(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server1"
}
func server2(ch chan string) {
	time.Sleep(1 * time.Second)
	ch <- "server2"
}
func main() {
	for i := 0; i < 5; i++ {
		output1 := make(chan string)
		output2 := make(chan string)
		go server1(output1)
		go server2(output2)
		for i := 0; i < 2; i++ {
			select {
			case s1 := <-output1:
				fmt.Println(s1)
			case s2 := <-output2:
				fmt.Println(s2)
			}
		}
		fmt.Println("server3")
		fmt.Println("--------------")
	}
	fmt.Println("5组执行完成")
}

上面使用了for+select控制2个server完成,也可以使用WaitGroup,如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

func server1(wg *sync.WaitGroup) {
	defer wg.Done() // 在函数退出时通知 WaitGroup

	time.Sleep(1 * time.Second)
	fmt.Println("server1")
}
func server2(wg *sync.WaitGroup) {
	defer wg.Done() // 在函数退出时通知 WaitGroup
	time.Sleep(1 * time.Second)
	fmt.Println("server2")
}
func main() {
	for i := 0; i < 5; i++ {
		wg := sync.WaitGroup{}
		wg.Add(2)
		go server1(&wg)
		go server2(&wg)
		wg.Wait()

		fmt.Println("server3")
		fmt.Println("--------------")
	}
	fmt.Println("5组执行完成")
}

输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。

server1
server2
server3
--------------
server1
server2
server3
--------------
server2
server1
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成

三个 server 的必须按 1 2 3 顺序执行

这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。

package main

import (
	"fmt"
	"sync"
	"time"
)

func server1(ch chan string, wg *sync.WaitGroup) {
	defer wg.Done()             // 在函数退出时通知 WaitGroup
	time.Sleep(1 * time.Second) // 模拟工作
	ch <- "server1"
}

func server2(ch chan string, wg *sync.WaitGroup) {
	defer wg.Done()             // 在函数退出时通知 WaitGroup
	time.Sleep(1 * time.Second) // 模拟工作
	ch <- "server2"
}

func main() {
	for i := 0; i < 5; i++ {
		output1 := make(chan string)
		output2 := make(chan string)
		var wg sync.WaitGroup

		// 启动 server1 和 server2
		wg.Add(2) // 等待两个 Goroutine 完成
		go server1(output1, &wg)
		go server2(output2, &wg)

		// 确保按顺序输出 server1, server2, server3
		s1 := <-output1
		s2 := <-output2
		fmt.Println(s1)
		fmt.Println(s2)
		fmt.Println("server3")
		fmt.Println("--------------")
	}

	// 完成5组任务
	fmt.Println("5组执行完成")
}

输出:

server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成

一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)

此处生产者生产的每个消息,只会有一个消费者消费,并发消费。

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 1; i <= 5; i++ {
		fmt.Printf("Produced: %d\n", i)
		ch <- i // 向通道发送数据
		time.Sleep(time.Second)
	}
	close(ch) // 生产者完成后关闭通道
}

func consumer(id int, ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range ch { // 从通道读取任务直到通道关闭
		fmt.Printf("Consumer %d processing task: %d\n", id, task)
		time.Sleep(2 * time.Second) // 模拟消费任务的延时
	}
}

func main() {
	var wg sync.WaitGroup
	tasks := make(chan int, 10) // 定义缓冲区大小为10的任务通道

	// 启动多个消费者
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go consumer(i, tasks, &wg)
	}

	// 启动生产者
	wg.Add(1)
	go producer(tasks, &wg)

	// 等待所有消费者完成工作
	wg.Wait()
	fmt.Println("All tasks completed.")
}

输出:

Produced: 1
Consumer 2 processing task: 1
Produced: 2
Consumer 1 processing task: 2
Produced: 3
Consumer 3 processing task: 3
Produced: 4
Consumer 2 processing task: 4
Produced: 5
Consumer 1 processing task: 5
All tasks completed.

一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)

此处生产者生产的每个消息,每个消费者都会消费。
方案:

  1. 我们应该设计一个 多播机制(广播模式),即每个任务都会被多个消费者消费。最简单的方式是使用 复制通道(通过多个 goroutine 消费同一个任务通道)。

  2. 可以通过 使用多个通道,每个消费者都从这些通道中接收任务,或者使用 sync.WaitGroup 等方式来确保每个消费者都能够完成任务处理。

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(channels []chan int, wg *sync.WaitGroup) {
	defer wg.Done() // 确保 producer 完成时通知 WaitGroup

	// 生产 5 个任务
	for i := 1; i <= 5; i++ {
		fmt.Printf("Produced: %d\n", i)
		// 向每个通道发送任务
		for _, ch := range channels {
			ch <- i
		}
		time.Sleep(time.Second)
	}

	// 所有任务发送完毕后,关闭每个通道
	for _, ch := range channels {
		close(ch)
	}
}

func consumer(id int, ch chan int, wg *sync.WaitGroup) {
	defer wg.Done() // 确保 consumer 完成时通知 WaitGroup

	// 从通道中接收任务并处理
	for task := range ch {
		fmt.Printf("Consumer %d processing task: %d\n", id, task)
		time.Sleep(2 * time.Second) // 模拟任务处理延迟
	}
}

func main() {
	var wg sync.WaitGroup
	numConsumers := 3
	channels := make([]chan int, numConsumers)

	// 创建多个消费者通道
	for i := 0; i < numConsumers; i++ {
		channels[i] = make(chan int, 10)
	}

	// 启动多个消费者
	for i := 1; i <= numConsumers; i++ {
		wg.Add(1)
		go consumer(i, channels[i-1], &wg)
	}

	// 启动生产者
	wg.Add(1)
	go producer(channels, &wg)

	// 等待所有消费者和生产者完成工作
	wg.Wait()
	fmt.Println("All tasks completed.")
}

输出:

Produced: 1
Consumer 1 processing task: 1
Consumer 2 processing task: 1
Consumer 3 processing task: 1
Produced: 2
Produced: 3
Consumer 1 processing task: 2
Consumer 2 processing task: 2
Consumer 3 processing task: 2
Produced: 4
Produced: 5
Consumer 2 processing task: 3
Consumer 1 processing task: 3
Consumer 3 processing task: 3
Consumer 1 processing task: 4
Consumer 2 processing task: 4
Consumer 3 processing task: 4
Consumer 3 processing task: 5
Consumer 2 processing task: 5
Consumer 1 processing task: 5
All tasks completed.


http://www.kler.cn/a/504107.html

相关文章:

  • 【ROS2】数据记录(ros2 bag)详解
  • NVIDIA CUDA Linux 官方安装指南
  • bochs+gdb调试linux0.11环境搭建
  • CAPL与CAN总线通信
  • 主数据系统建设模式分析
  • 当当网热销书籍数据采集与可视化分析
  • macos遇到You have not agreed to the Xcode license agreements.
  • SpringBoot之OriginTrackedPropertiesLoader类源码学习
  • 网管平台(进阶篇):路由器的管理实践
  • 华三S6520交换机配置console和ssh
  • 【数据结构学习笔记】19:跳表(Skip List)
  • 浅谈计算机网络02 | SDN控制平面
  • 一个使用 Golang 编写的新一代网络爬虫框架,支持JS动态内容爬取
  • 【漫话机器学习系列】047.指数型线性单元(Exponential Linear Units,ELU)
  • 1.4 给应用添加service,执行扩容和滚动更新
  • TDSQL 内存占用解析一例
  • Golang|单机并发缓存
  • 24. 【.NET 8 实战--孢子记账--从单体到微服务】--记账模块--预算扣除、退回、补充
  • 华为2024嵌入式研发面试题
  • Adobe与MIT推出自回归实时视频生成技术CausVid。AI可以边生成视频边实时播放!
  • Oracle 终止正在执行的SQL
  • 下载导出Tomcat上的excle文档,浏览器上显示下载
  • Web前端------HTML块级和行内标签之块级标签
  • kube-prometheus监控Linux主机
  • 关于H5复制ios没有效果
  • JavaScript系列(25)--性能优化技术详解