当前位置: 首页 > article >正文

14 go语言(golang) - 并发编程goroutine和channel

前言

并发编程是指在计算机程序中同时执行多个计算任务的技术。这种编程方式旨在利用多核处理器的计算能力,提高程序的执行效率和响应速度。而Go 语言的并发编程主要依赖于两个核心概念:Goroutine 和 Channel。

Goroutine

Goroutine 是 Go 语言中的一种轻量级线程管理机制。它是 Go 并发编程的核心特性之一,允许开发者以非常简单和高效的方式实现并发操作。

1、特点

  1. 轻量级:与传统的操作系统线程相比,goroutine 非常轻量。一个典型的 goroutine 只占用几 KB 的内存,而不是 MB 级别。这使得在同一时间可以运行成千上万个 goroutine,而不会对系统资源造成太大压力。

  2. 调度管理:Go 运行时包含了自己的调度器,用于管理 goroutines 的执行。这个调度器会将多个 goroutines 映射到少数几个 OS 线程上,从而有效地利用多核处理器。

  3. 简单启动:启动一个新的 goroutine 非常简单,只需要在函数调用前加上 go 关键字即可。

  4. 自动栈增长:goroutines 使用的是可增长的栈,这意味着它们开始时使用很小的内存,并根据需要动态扩展。这种特性进一步提高了其效率和灵活性。

  5. 与 Channel 配合使用:虽然 goroutines 可以独立工作,但通常会与 Channels 一起使用,以便在不同的 goroutines 间进行通信和同步。

2、示例

package main

import (
	"fmt"
	"testing"
)

func Test1(t *testing.T) {
	for i := 0; i < 10; i++ {
		go fmt.Printf("go线程 %d \n", i)
	}

	fmt.Println("主流程")
}

输出:

=== RUN   Test1
主流程
go线程 2 
go线程 3 
go线程 5 
--- PASS: Test1 (0.00s)
go线程 0 
go线程 7 
go线程 6 
go线程 8 
PASS
go线程 4 

多执行几次发现会少了一些打印,如 1和9线程

改进:

func Test2(t *testing.T) {
	for i := 0; i < 10; i++ {
		go fmt.Printf("go线程 %d \n", i)
	}

	time.Sleep(1 * time.Second)
	fmt.Println("主流程")
}

输出:

=== RUN   Test2
go线程 9 
go线程 0 
go线程 1 
go线程 2 
go线程 3 
go线程 4 
go线程 7 
go线程 8 
go线程 5 
go线程 6 
主流程
--- PASS: Test2 (1.00s)
PASS

3、waitGroup

考虑如何优雅的等待子线程执行完成,而不是简单的让主线程等待,常用的方法是使用 sync.WaitGroupWaitGroup 提供了一种简单而有效的方式来等待一组 goroutines 完成它们的工作,而不需要显式地让主线程休眠。它通过一个计数器来跟踪 goroutines 的数量。你可以增加或减少这个计数器,并且可以阻塞直到计数器变为零,这表示所有被跟踪的 goroutines 都已完成。

基本步骤

  1. 创建 WaitGroup 实例:在你的程序中创建一个 WaitGroup 实例。
  2. 增加计数:每启动一个新的 goroutine,就调用 Add(1) 来增加 WaitGroup 的计数。
  3. 标记完成:在每个 goroutine 内部,任务完成时调用 Done() 来减少 WaitGroup 的计数。
  4. 等待所有任务完成:在主线程中调用 Wait() 方法,它会阻塞直到所有被追踪的任务都标记为完成。
// 模拟工作的函数
func exec(num int, wg *sync.WaitGroup) {
	defer wg.Done() // 确保工作结束后通知 WaitGroup
	go fmt.Printf("go线程 %d \n", num)
}

func Test3(t *testing.T) {
	group := sync.WaitGroup{}

	// 启动多个goroutine并使用WaitGroup追踪它们
	for i := 0; i < 10; i++ {
		group.Add(1) // 增加WaitGroup计数
		go exec(i, &group)
	}

	group.Wait() // // 阻塞主线程,直到所有线程都执行完毕

	fmt.Println("主流程")
}

注意事项

  • 确保 Done 调用:始终确保每个启动的 goroutine 在其逻辑结束时调用了 Done() 方法。这通常通过使用关键字 defer 来实现,以确保即使发生错误也能正确递减。
  • 避免重复 Add 和 Done 操作:不要对同一组操作多次进行 Add 或 Done 调用,否则可能导致不一致状态和死锁。

Channel

在 Go 语言中,Channel 是一种用于 goroutine 之间进行通信的机制。它可以让一个 goroutine 发送特定类型的值到 Channel 中,然后另一个 goroutine 从该 Channel 接收值,从而实现数据的安全传递和同步。

1、基本特性

  1. 类型化:Channel 是类型化的,这意味着你需要指定要传输的数据类型。
  2. 阻塞行为
    • 发送阻塞:当一个 goroutine 向 Channel 发送数据时,如果没有其他 goroutine 正在等待接收这个数据,那么发送操作会被阻塞,直到有接收者。
    • 接收阻塞:同样地,当一个 goroutine 尝试从 Channel 接收数据时,如果没有其他 goroutine 正在向这个 Channel 发送数据,那么接收操作会被阻塞,直到有新的数据可用。
  3. 方向性:Channel 可以是双向的,也可以是单向(只读或只写)的。

2、创建和使用 Channel

func Test4(t *testing.T) {
	ch := make(chan int)

	for i := 0; i < 10; i++ {
		go send(i, ch)
	}

	for i := 0; i < 10; i++ {
		value := <-ch
		fmt.Printf("接受到消息:%d\n", value)
		time.Sleep(time.Second)
	}
}

func send(i int, ch chan int) {
	fmt.Printf("-- 准备发送:%d\n", i)
	ch <- i
	fmt.Printf("-- 发送成功!%d\n", i) // 这行打印会被阻塞
}

输出:

=== RUN   Test4
-- 准备发送:9
-- 发送成功!9
接受到消息:9
-- 准备发送:2
-- 准备发送:6
-- 准备发送:1
-- 准备发送:8
-- 准备发送:3
-- 准备发送:4
-- 准备发送:5
-- 准备发送:0
-- 准备发送:7
接受到消息:2
-- 发送成功!2
接受到消息:6
-- 发送成功!6
接受到消息:1
-- 发送成功!1
接受到消息:8
-- 发送成功!8
接受到消息:3
-- 发送成功!3
接受到消息:4
-- 发送成功!4
接受到消息:5
-- 发送成功!5
接受到消息:0
-- 发送成功!0
接受到消息:7
-- 发送成功!7
--- PASS: Test4 (10.01s)
PASS

3、带缓冲区的 Channel

带缓冲区的 Channels 可以存储一定数量的数据,而不会立即导致 sender 或 receiver 被阻塞。创建带缓冲区 Channels 时,可以指定其容量。

func Test5(t *testing.T) {
  
	ch := make(chan int, 5)

	for i := 0; i < 10; i++ {
		go send(i, ch)
	}

	for i := 0; i < 10; i++ {
		value := <-ch
		fmt.Printf("接受到消息:%d\n", value)
		time.Sleep(time.Second)
	}
}

输出:

=== RUN   Test5
-- 准备发送:9
-- 发送成功!9
接受到消息:9
-- 准备发送:0
-- 发送成功!0
-- 准备发送:1
-- 发送成功!1
-- 准备发送:2
-- 发送成功!2
-- 准备发送:3
-- 发送成功!3
-- 准备发送:7
-- 发送成功!7
-- 准备发送:8
-- 准备发送:6
-- 准备发送:4
-- 准备发送:5
接受到消息:0
-- 发送成功!8
接受到消息:1
-- 发送成功!6
接受到消息:2
-- 发送成功!4
接受到消息:3
-- 发送成功!5
接受到消息:7
接受到消息:8
接受到消息:6
接受到消息:4
接受到消息:5
--- PASS: Test5 (10.01s)
PASS

长度(Length)

  • 定义:Channel 的长度是指当前 Channel 中已存储的数据元素的数量。
  • 获取方式:可以使用内置函数 len(ch) 来获取 Channel 的当前长度。

容量(Capacity)

  • 定义:Channel 的容量是指它最多可以容纳多少个数据元素。对于无缓冲的 channel,容量为零;对于带缓冲的 channel,容量是在创建时指定的。
  • 获取方式:可以使用内置函数 cap(ch) 来获取 Channel 的总容量。
func TestChannelCapAndLen(t *testing.T) {
	ch := make(chan string, 3)
	ch <- "a"
	ch <- "b"
	fmt.Println("容量为:", cap(ch))
	fmt.Println("长度为:", len(ch))
	fmt.Println("读取一个元素:", <-ch)
	fmt.Println("新的长度为:", len(ch))
}

输出:

=== RUN   TestChannelCapAndLen
容量为: 3
长度为: 2
读取一个元素: a
新的长度为: 1
--- PASS: Test9 (0.00s)
PASS

4、单向 Channel

单向 Channel 用于限制函数对 channel 的访问权限,只能用于 send 或 receive 操作。这种限制可以帮助避免错误地使用 channel。

func sendOnly(i int, ch chan<- int) {
	fmt.Printf("-- 准备发送:%d\n", i)
	ch <- i
	fmt.Printf("-- 发送成功!%d\n", i)
}

func receiveOnly(ch <-chan int) {
	value := <-ch
	fmt.Printf("接受到消息:%d\n", value)
	time.Sleep(time.Second)
}

func Test6(t *testing.T) {
	ch := make(chan int)

	for i := 0; i < 10; i++ {
		go sendOnly(i, ch)
	}

	for i := 0; i < 10; i++ {
		receiveOnly(ch)
	}
}

5、关闭Channel

如果channel没有关闭,消费者仍然在等待数据,则可能导致死锁

func Test7(t *testing.T) {
	ch := make(chan int)

	for i := 0; i < 10; i++ {
		go sendOnly(i, ch)
	}

	// 消费者仍然在等待数据,则可能导致死锁
	for value := range ch {
		fmt.Printf("接受到消息:%d\n", value)
		time.Sleep(time.Second)
	}
}

报错:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
testing.(*T).Run(0xc00010c4e0, {0x3d35a17?, 0x125836780012db50?}, 0x3da35f0)
	/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1751 +0x3ab
testing.runTests.func1(0xc00010c4e0)
	/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:2168 +0x37
testing.tRunner(0xc00010c4e0, 0xc00012dc70)
	/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1690 +0xf4
testing.runTests(0xc000010030, {0x3e7cbe0, 0x9, 0x9}, {0x3c86030?, 0x3c85c9a?, 0x0?})
	/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:2166 +0x43d
testing.(*M).Run(0xc00007a0a0)
	/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:2034 +0x64a
main.main()
	_testmain.go:61 +0x9b

goroutine 5 [chan receive]:
awesomeProject/_28goroutine.Test7(0xc00010c680?)
	/Users/fangyirui/GolandProjects/awesomeProject/_28goroutine/1_test.go:114 +0x105
testing.tRunner(0xc00010c680, 0x3da35f0)
	/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1690 +0xf4
created by testing.(*T).Run in goroutine 1
	/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1743 +0x390

关闭 Channel 是一个重要的操作,它通知接收者不会再有新的数据发送到这个 Channel 上。

  1. 通知完成:通过关闭一个 Channel,可以向接收者表明没有更多的数据会被发送。这对于需要知道何时停止读取数据的消费者来说非常有用。
  2. 避免死锁:如果所有发送者都退出了,而消费者仍然在等待数据,则可能导致死锁。通过显式地关闭 Channel,可以避免这种情况。

改进:使用内置函数 close 来关闭一个 channel

func sendOnlyV2(i int, ch chan<- int, wg *sync.WaitGroup) {
	fmt.Printf("-- 准备发送:%d\n", i)
	ch <- i
	fmt.Printf("-- 发送成功!%d\n", i)
	wg.Done()
}

func Test8(t *testing.T) {
	wg := sync.WaitGroup{}
	ch := make(chan int)

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go sendOnlyV2(i, ch, &wg)
	}

	// 启动另一个goroutine来等待所有send操作完成后再关闭channel,不然channel关闭后还没发送完数据会报错
	go func() {
		wg.Wait()
		// 发送完毕,关闭channel
		close(ch)
		fmt.Println("channel 已经关闭")
	}()

	for value := range ch {
		fmt.Printf("接受到消息:%d\n", value)
		time.Sleep(time.Second)
	}
}

http://www.kler.cn/a/405807.html

相关文章:

  • JavaWeb之综合案例
  • Unity3D基于ECS的游戏逻辑线程详解
  • Spring Web入门练习
  • HarmonyOs DevEco Studio小技巧31--卡片的生命周期与卡片的开发
  • 7天掌握SQL - 第三天:MySQL实践与索引优化
  • 嵌入式系统中QT实现网络通信方法
  • java http body的格式 ‌application/x-www-form-urlencoded‌不支持文件上传
  • 【C#设计模式(13)——代理模式(Proxy Pattern)】
  • vue生命周期 (创建阶段 | 挂载阶段 | 更新阶段 | 销毁阶段 )
  • http 流量接入 Dubbo 后端服务
  • 系统调用介绍
  • 资源控制器--laravel进阶篇
  • 我的创作纪念日——创作懿佰贰拾捌天
  • 什么是事务?事务有哪些特性?
  • CSS3_BFC(十二)
  • 医药企业的终端市场营销策略
  • HTTP keep-alive和TCP keepalive详解
  • npm/cnpm的使用
  • Day24 回溯算法part03
  • 什么是AOT技术?Java语言不同类型的JIT或AOT编译器?
  • android 实现答题功能
  • 046 购物车
  • Go语言基本类型转换
  • git使用(一)
  • 金融数据中心容灾“大咖说” | 美创科技赋能“灾备一体化”建设
  • STM32 ADC 读取模拟量