# 6. Go语言并发编程
# 6.1 并发基础简述
进程 & 线程 & 协程
**进程:**直观点说,保存在硬盘上的程序运行以后,会在内存空间里形成一个独立的内存体,这个内存体有自己独立的地址空间,有自己的堆,上级挂靠单位是操作系统。操作系统会以进程为单位,分配系统资源(CPU时间片、内存等资源),进程是资源分配的最小单位。
**线程:**有时被称为轻量级进程(Lightweight Process,LWP),是操作系统调度(CPU调度)执行的最小单位。
结合下图来理解进程和线程之间的关系:
这副图是一个双向多车道的道路图,假如我们把整条道路看成是一个“进程”的话,那么图中由白色虚线分隔开来的各个车道就是进程中的各个“线程”了。
- 这些线程(车道)共享了进程(道路)的公共资源(土地资源)
- 这些线程(车道)必须依赖于进程(道路),也就是说,线程不能脱离于进程而存在(就像离开了道路,车道也就没有意义了)
- 这些线程(车道)之间可以并发执行(各个车道你走你的,我走我的),也可以互相同步(某些车道在交通灯亮时禁止继续前行或转弯,必须等待其它车道的车辆通行完毕)
- 这些线程(车道)之间依靠代码逻辑(交通灯)来控制运行,一旦代码逻辑控制有误(死锁,多个线程同时竞争唯一资源),那么线程将陷入混乱,无序之中
- 这些线程(车道)之间谁先运行是未知的,只有在线程刚好被分配到CPU时间片(交通灯变化)的那一刻才能知道
协程: 英文Coroutines,是一种基于线程之上,但又比线程更加轻量级的存在,这种由程序员自己写程序来管理的轻量级线程叫做『用户空间线程』,具有对内核来说不可见的特性。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。协程在子程序内部是可中断的,然后转而执行别的子程序,在适当的时候再返回来接着执行。
协程的特点在于是一个线程执行,那和多线程比,协程有何优势?
- 极高的执行效率:因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显;
- 不需要多线程的锁机制:因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
并发和并行
并发: 英文叫作 concurrency。它是指同一时刻只能有一条指令执行,但是多个线程的对应的指令被快速轮换地执行。比如:一个处理器,它先执行线程 A 的指令一段时间,再执行线程 B 的指令一段时间,再切回到线程 A 执行一段时间。由于处理器执行指令的速度和切换的速度非常非常快,人完全感知不到计算机在这个过程中有多个线程切换上下文执行的操作,这就使得宏观上看起来多个线程在同时运行。但微观上只是这个处理器在连续不断地在多个线程之间切换和执行,每个线程的执行一定会占用这个处理器一个时间片段,同一时刻,其实只有一个线程在执行。
并行: 英文叫作 parallel。它是指同一时刻,有多条指令在多个处理器上同时执行,并行必须要依赖于多个处理器。不论是从宏观上还是微观上,多个线程都是在同一时刻一起执行的。并行只能在多处理器系统中存在,如果我们的计算机处理器只有一个核,那就不可能实现并行。
并行与并发如下图关系:
# 6.2 Goroutine
goroutine 是 Go语言中的轻量级线程实现,由 Go 运行时(runtime)管理。Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。
Go 程序从 main 包的 main() 函数开始,在程序启动时,Go 程序就会为 main() 函数创建一个默认的 goroutine。
创建 goroutine 的写法如下, 使用 go 关键字创建 goroutine 时,被调用函数的返回值会被忽略:
go 函数名( 参数列表 )
- 函数名:要调用的函数名。
- 参数列表:调用函数需要传入的参数。
创建 单个 goroutine
示例:
import "fmt"
//创建 sayHello 函数
func sayHello() {
fmt.Println("Hello")
}
func main() {
go sayHello()
fmt.Println("main end")
}
编译运行的结果不一,有时候返回
Hello
main end
有时候直接返回 : main end
,这是什么原因呢?
在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束,所以我们要想办法让main函数等一等hello函数,最简单粗暴的方式就是让main程序sleep,修改示例如下:
import (
"fmt"
"time"
)
//创建 sayHello 函数
func sayHello() {
fmt.Println("Hello")
}
func main() {
go sayHello()
fmt.Println("main end")
time.Sleep(1)
}
执行上面的代码你会发现,这一次先打印 Hello
,然后紧接着打印 main end
。
go 关键字后也可以为匿名函数或闭包启动 goroutine,使用匿名函数或闭包创建 goroutine 时,除了将函数定义部分写在 go 的后面之外,还需要加上匿名函数的调用参数,格式如下:
go func( 参数列表 ){
函数体
}( 调用参数列表 )
其中:
- 参数列表:函数体内的参数变量列表。
- 函数体:匿名函数的代码。
- 调用参数列表:启动 goroutine 时,需要向匿名函数传递的调用参数。
示例:
import (
"fmt"
"time"
)
func main() {
go func(i int) {
for {
i++
fmt.Println("i:", i)
if i > 10 {
break
}
}
}(0)
time.Sleep(time.Second)
}
运行结果:
i: 1
i: 2
i: 3
i: 4
i: 5
i: 6
i: 7
i: 8
i: 9
i: 10
i: 11
# 6.3 调整并发的运行性能
在 Go语言程序运行时(runtime)实现了一个小型的任务调度器。这套调度器的工作原理类似于操作系统调度线程,Go 程序调度器可以高效地将 CPU 资源分配给每一个任务。传统逻辑中,开发者需要维护线程池中线程与 CPU 核心数量的对应关系。同样的,Go 地中也可以通过 runtime.GOMAXPROCS() 函数做到,格式为:
runtime.GOMAXPROCS(逻辑CPU数量)
这里的逻辑CPU数量可以有如下几种数值:
- <1:不修改任何数值。
- =1:单核心执行。
- >1:多核并发执行。
一般情况下,可以使用 runtime.NumCPU() 查询 CPU 数量,并使用 runtime.GOMAXPROCS() 函数进行设置,例如:
runtime.GOMAXPROCS(runtime.NumCPU())
Go 1.5 版本之前,默认使用的是单核心执行。从 Go 1.5 版本开始,默认执行上面语句以便让代码并发执行,最大效率地利用 CPU。
GOMAXPROCS 同时也是一个环境变量,在应用程序启动前设置环境变量也可以起到相同的作用。
# 6.4 通道Channel
如果说 goroutine 是 Go语言程序的并发体的话,那么 channels 就是它们之间的通信机制。一个 channels 是一个通信机制,它可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。
Go语言提倡使用通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。
通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。
# 6.4.1 声明与创建通道
channel是一种类型,一种引用类型。声明通道类型的格式如下:
var 变量 chan 元素类型
chan 类型的空值是 nil,声明后需要配合 make 后才能使用。
通道是引用类型,需要使用 make 进行创建,格式如下:
通道实例 := make(chan 数据类型)
- 数据类型:通道内传输的元素类型。
- 通道实例:通过make创建的通道句柄
声明与创建通道代码示例:
func main() {
var chan1 chan int //声明int类型通道chan1
fmt.Println(chan1)
chan1 = make(chan int) //创建chan1通道,并分配初始化地址
fmt.Println(chan1)
chan2 := make(chan string) //创建string类型通道chan2
fmt.Println(chan2)
}
编译运行结果:
<nil>
0xc000040060
0xc0000400c0
创建通道后,想要关闭通道可以使用 closed
函数即可:
close(ch)
# 6.4.2 使用通道发送和接收数据
通道创建后,就可以使用通道进行发送和接收操作。
通道发送数据
将数据通过通道发送的格式为:
通道变量 <- 数据
示例:
func main() { chan1 := make(chan interface{}) //定义数据为任意数据的通道chan1 chan1 <- 1 //将数据1 发送到通道 chan1中 chan1 <- 2 //将数据2 发送到通道 chan1中 }
把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示:
fatal error: all goroutines are asleep - deadlock!
通道接收数据
通道接收同样使用
<-
操作符,接收数据可以有以下写法:- 阻塞式接收数据
阻塞模式接收数据时,将接收变量作为
<-
操作符的左值,格式如下:data := <-ch
执行该语句时将会阻塞,直到接收到数据并赋值给 data 变量。
非阻塞式接收数据
使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下:
data, ok := <-ch
非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。
接收任意数据
阻塞接收数据后,忽略从通道返回的数据,格式如下:
<-ch
执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。
循环接收
通道的数据接收可以借用 for range 语句进行多个元素的接收操作,格式如下:
for data := range ch { }
通道 ch 是可以进行遍历的,遍历的结果就是接收到的数据。数据类型就是通道的数据类型。通过 for 遍历获得的变量只有一个,即上面例子中的 data。
【示例一】通道发送和接收数据:
func main() { chan1 := make(chan interface{}) //定义数据为任意数据的通道chan1 go func() { fmt.Println("start goroutine") chan1 <- 1 //将数据1发送到通道 chan1中 fmt.Println("exit goroutine") }() fmt.Println("wait goroutine") data := <-chan1 //接收通道chan1 数据 fmt.Println(data) }
编译结果:
wait goroutine start goroutine exit goroutine 1
【示例二】 通道循环接收数据
func main() { ch := make(chan interface{}) //定义数据为任意数据的通道ch go func() { for i := 0; i <= 10; i++ { ch <- i //往通道ch发送数据 time.Sleep(time.Second) } }() for data := range ch { //循环接收通道ch数据 fmt.Println(data) if data == 10 { break //退出关闭chan通道,否则会报错 } } fmt.Println("end") }
编译运行结果:
0 1 2 3 4 5 6 7 8 9 10 end
# 6.4.3 单向通道
单向 channel 就是只能用于写入或者只能用于读取数据。单项 channel主要用途是用来约束其他代码的行为。一般单项 channel 多使用在 函数的参数定义以及接口定义上。
单项channel定义格式:
var 通道变量 chan<- 变量类型 //只能写入数据
var 通道变量 <-chan 变量类型 //只能读取数据
或者使用make函数直接定义:
通道变量 := make(chan<- 变量类型) //只能写入数据
通道变量 := make(<-chan 变量类型) //只能读取数据
定义只写通道,强制读取通道数据报错代码示例:
func main() {
ch := make(chan<- int) //定义只写通道
go func() {
ch <- 1
}()
data := <-ch //读取通道ch数据,此时会报错
fmt.Println(data)
}
上面的例子中,ch只能写入数据,如果尝试 data := <-ch
读取数据,将会出现如下报错:
invalid operation: <-ch (receive from send-only type chan<- int)
双向通道传入,转为单项通道使用示例:
import (
"fmt"
"time"
)
//定义写入函数,参数为只写的int型通道
func write(ch chan<- int) {
for i := 0; i < 10; i++ {
fmt.Println("write data", i)
ch <- i
}
}
//定义读取函数,参数为只读的int型通道
func read(ch <-chan int) {
for data := range ch {
fmt.Println("read data, data:", data)
}
}
func main() {
ch := make(chan int) //定义通道ch
go write(ch) //启动写入 goroutine
go read(ch) //启动读取 goroutine
time.Sleep(time.Second) //增加睡眠时间,等待goroutine执行完毕
close(ch)
}
# 6.4.4 无缓冲通道与缓冲通道
了解无缓冲通道与缓冲通道前,先了解下通道(channel)的三种操作:
- send:表示sender端的goroutine向channel中投放数据
- receive:表示receiver端的goroutine从channel中读取数据
- close:表示关闭channel
- 关闭channel后,send操作将导致painc
- 关闭channel后,recv操作将返回对应类型的0值以及一个状态码false
- close并非强制需要使用close(ch)来关闭channel,在某些时候可以自动被关闭
- 如果使用close(),建议条件允许的情况下加上defer
- 只在sender端上显式使用close()关闭channel。因为关闭通道意味着没有数据再需要发送
# 6.4.4.1 无缓冲通道
无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。也叫阻塞或者同步通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。
无缓存通道定于格式:
make(chan 变量类型)
无缓冲的通道特性:
- sender端向channel中send一个数据,然后阻塞,直到receiver端将此数据receive
- receiver端一直阻塞,直到sender端向channel发送了一个数据
使用无缓冲的通道在 goroutine 之间同步数据,有点像接力赛,一方必须等待另一方到达才能起步,不然会一直在堵塞状态,我们用模拟接力赛代码来展示无缓冲通道的流程:
import (
"fmt"
"sync"
)
var wgg sync.WaitGroup //wgg 用来等待程序结束
//Runner 模拟接力比赛中的一位跑步者
func Runner(baton chan int) {
var newRunner int
runner := <-baton
fmt.Printf("第 %d 位 接力员接棒了\n", runner)
if runner == 4 {
fmt.Printf("总计 %d 接力员接力, 接力比赛结束\n", runner)
wgg.Done() //结束比赛等待
return
} else {
newRunner = runner + 1
fmt.Printf("第 %d 位接力员等待中...\n", newRunner)
go Runner(baton)
}
fmt.Printf("第 %d 位接力员完成接棒,将接力棒传给 %d 号接力员\n",
runner,
newRunner)
baton <- newRunner
}
func main() {
baton := make(chan int) //创建无缓存通道
wgg.Add(1)
go Runner(baton) //第一位跑步者开始接力
baton <- 1 //开始比赛
wgg.Wait() //等待比赛结束
}
编译运行结果:
第 1 位 接力员接棒了
第 2 位接力员等待中...
第 1 位接力员完成接棒,将接力棒传给 2 号接力员
第 2 位 接力员接棒了
第 3 位接力员等待中...
第 2 位接力员完成接棒,将接力棒传给 3 号接力员
第 3 位 接力员接棒了
第 4 位接力员等待中...
第 3 位接力员完成接棒,将接力棒传给 4 号接力员
第 4 位 接力员接棒了
总计 4 接力员接力, 接力比赛结束
# 6.4.4.2 缓冲通道
有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
创建带缓冲的通道格式:
通道实例 := make(chan 通道类型, 缓冲大小)
- 通道类型:和无缓冲通道用法一致,影响通道发送和接收的数据类型。
- 缓冲大小:决定通道最多可以保存的元素数量。
- 通道实例:被创建出的通道实例。
带缓冲通道在很多特性上和无缓冲通道是类似的。无缓冲通道可以看作是长度永远为 0 的带缓冲通道。因此根据这个特性,带缓冲通道在下面列举的情况下依然会发生阻塞:
- sender端可以向channel中send多个数据(只要channel容量未满),容量满之前不会阻塞,容量被填满时,sender端尝试再次发送数据时发生阻塞
- receiver端按照队列的方式(FIFO,先进先出)从buffered channel中按序receive其中数据
- 带缓冲通道为空时,尝试接收数据时发生阻塞。
下面通过一个例子中来理解带缓冲通道的用法,参见下面的代码:
func main() {
ch := make(chan int, 5) //创建容量为5的缓存通道
ch <- 1 //sender发送数据
ch <- 2 //sender发送数据
ch <- 3 //sender发送数据
fmt.Println(len(ch))
}
运行编译结果:
3
# 6.4.5 通道中的select
# 6.4.5.1 超时机制
Go语言没有提供直接的超时处理机制,但我们可以用 select 来设置超时。虽然 select 机制不是专门为超时而设计的,却能很方便的解决超时问题,因为 select 的特点是只要其中有一个 case 已经完成,程序就会继续往下执行,而不会考虑其他 case 的情况。
与 switch 语句相比,select 有比较多的限制,其中最大的一条限制就是每个 case 语句里必须是一个 IO 操作,大致的结构如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
在一个 select 语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。
如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有如下两种可能的情况:
- 如果给出了 default 语句,那么就会执行 default 语句,同时程序的执行会从 select 语句后的语句中恢复;
- 如果没有 default 语句,那么 select 语句将被阻塞,直到至少有一个通信可以进行下去。
channel使用select设置超时机制示例:
func main() {
ch := make(chan int) //数据接收发送 channel
quit := make(chan bool) //退出channel
//新开一个协程
go func(c chan int) {
for {
select {
case d := <-ch: //获取ch名称得channel数据
fmt.Println("get channel num is", d)
case <-time.After(3 * time.Second): //超时处理
quit <- true
fmt.Println("time out")
break
}
}
}(ch)
for i := 0; i < 10; i++ {
ch <- i
time.Sleep(1 * time.Second)
}
<-quit
time.Sleep(15 * time.Second)
fmt.Println("main over")
}
程序编译运行结果:
get channel num is 0
get channel num is 1
get channel num is 2
get channel num is 3
get channel num is 4
get channel num is 5
get channel num is 6
get channel num is 7
get channel num is 8
get channel num is 9
quit
main over
# 6.4.5.2 多路复用
多路复用是通信和网络中的一个专业术语。多路复用通常表示在一个信道上传输多路信号或数据流的过程和技术。
Go语言中提供了 select 关键字,可以同时响应多个通道的操作。select 的用法与 switch 语句非常类似,由 select 开始一个新的选择块,每个选择条件由 case 语句来描述。
代码示例:
import (
"fmt"
"os"
"time"
)
//倒计数函数
func countdown(c chan int) {
for i := 10; i >= 0; i-- {
c <- i
time.Sleep(1 * time.Second)
}
}
func cancel(c chan int) {
os.Stdin.Read(make([]byte, 1))
c <- -1
}
func main() {
fmt.Println("火箭即将发射,开始倒计数")
lunch := make(chan int)
abort := make(chan int)
go countdown(lunch)
go cancel(abort)
loop:
for {
select {
case d := <-lunch:
fmt.Println("发射倒计时:", d)
if d <= 0 {
fmt.Println("hrer")
break loop
}
case <-abort:
fmt.Println("火箭取消发射..")
goto x
}
}
time.Sleep(15 * time.Second)
fmt.Println("火箭已经发射!!")
return
x:
fmt.Println("确认发射取消!!")
}
上述代码是一个火箭发射倒计时程序,select可以接收取消和倒计时两个通道数据,实现了多路数据同时执行的效果。
# 6.5 等待组(sync.WaitGroup)
在前面的代码例子中,为了让主 goroutine 等待子goroutine 执行完,经常在main函数中使用 time.Sleep()
操作,这种做法是为了演示代码的一种方式。实际情况中这样做肯定是不合适的,所以要使用 本节主角 sync.WaitGroup 来实现并发任务的同步。
在 sync.WaitGroup(等待组)类型中,每个 sync.WaitGroup 值在内部维护着一个计数,此计数的初始默认值为零。
等待组有下面几个方法可用,如下表所示:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 等待组的计数器 +1 |
(wg * WaitGroup) Done() | 等待组的计数器 -1 |
(wg * WaitGroup) Wait() | 当等待组计数器不等于 0 时阻塞直到变 0 |
等待组内部拥有一个计数器,计数器的值可以通过上述方法调用实现计数器的增加和减少。
等待组实际操作代码示例:
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup //建立等待组
ch := make(chan int) //建立通道
wg.Add(1) //等待组数据+1
go func(c chan int) {
for j := 0; j <= 10; j++ {
ch <- j
}
}(ch)
for {
select {
case d := <-ch:
fmt.Println(d)
if d >= 10 {
wg.Done() //等待组数据-1
goto loop
}
}
}
loop:
fmt.Println("main exit...")
wg.Wait() //等待
}
需要注意 sync.WaitGroup
是一个结构体,传递的时候要传递指针。
# 6.6 并发安全与锁
有并发,就有资源竞争,如果两个或者多个 goroutine
在没有相互同步的情况下,访问某个共享的资源,比如同时对该资源进行读写时,就会处于相互竞争的状态,这就是并发中的资源竞争。
并发本身并不复杂,但是因为有了资源竞争的问题,就使得我们开发出好的并发程序变得复杂起来,因为会引起很多莫名其妙的问题。
# 6.6.1 死锁、活锁和饥饿的概念
死锁
死锁是指两个或两个以上的进程(或线程)在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
下面代码列举几种死锁情况:
- 由于无缓冲channel必须同时写和读才能执行,所以当单个gorountine顺序执行的时候,channel的此性质会造成死锁,代码如下:
func main() { ch := make(chan int) ch <- 1 //channel通道ch写入数据后,阻塞等待接收,导致下方ch读取数据无法执行 <-ch }
- 2个 以上的go程中, 使用同一个 channel 通信。 写入无缓冲channel先顺序执行的时候, 本质上跟死锁情况1是属于同种情况,代码如下:
func main() { ch := make(chan int) ch <- 1 // channel通道ch写入数据后,阻塞等待接收,导致下方gorountine无法继续执行 go func() { <-ch }() }
- 2个以上的go程中,使用多个 channel 通信。 A go 程 获取channel 1 的同时,尝试使用channel 2, 同一时刻,B go 程 获取channel 2 的同时,尝试使用channel 1 ,代码如下:
func main() { ch1 := make(chan int) ch2 := make(chan int) go func() { //匿名子go程 for { select { //这里互相等对方造成死锁 case <-ch1: //这里ch1有数据读出才会执行下一句 ch2 <- 1 } } }() for { //主go程 select { case <-ch2 : //这里ch2有数据读出才会执行下一句 ch1 <- 2 } } }
- 隐性死锁,一般有混用 channel 和 锁导致的,看如下代码:
package main import ( "fmt" "math/rand" "sync" "time" ) var rwMutex sync.RWMutex func readGo(in <-chan int, idx int){ for { rwMutex.RLock() // 以读模式加锁 num := <- in fmt.Printf("----第%d个读go程, 读入: %d\n", idx, num) rwMutex.RUnlock() // 以读模式解锁 } } func writeGo(out chan<- int, idx int){ for { // 生成随机数 num := rand.Intn(1000) rwMutex.Lock() // 以写模式加锁 out <- num fmt.Printf("第%d个写go程, 写入: %d\n", idx, num) time.Sleep(time.Millisecond * 300) rwMutex.Unlock() // 以写模式解锁 } } func main() { // 随机数种子 rand.Seed(time.Now().UnixNano()) ch := make(chan int) for i:=0; i<5; i++ { go readGo(ch, i+1) } for i:=0; i<5; i++ { go writeGo(ch, i+1) } time.Sleep(time.Second * 3) }
上面代码的结果会一直阻塞, 没有输出, 大家可以简单想一下出现这种情况的原因是什么?
代码看得仔细的应该都可以看出来, 这上面的代码中, 比如说读操作先抢到了CPU, 运行代码
rwMutex.RLock()
读加锁, 然后运行到num := <- in
时, 会要求写端同时在线, 否则就会发生阻塞, 但是这时写端不可能在线, 因为读加锁了. 所以就会一直在这发生阻塞.这也就是我们之前在死锁部分中提到的 隐性死锁 (不报错).
那么解决办法有两种: 一种是不混用, 另一种是使用条件变量。
活锁
活锁是另一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行,因为线程将不断重复同样的操作,而且总会失败。
活锁通常发生在处理事务消息中,如果不能成功处理某个消息,那么消息处理机制将回滚事务,并将它重新放到队列的开头。这样,错误的事务被一直回滚重复执行,这种形式的活锁通常是由过度的错误恢复代码造成的,因为它错误地将不可修复的错误认为是可修复的错误。
当多个相互协作的线程都对彼此进行相应而修改自己的状态,并使得任何一个线程都无法继续执行时,就导致了活锁。这就像两个过于礼貌的人在路上相遇,他们彼此让路,然后在另一条路上相遇,然后他们就一直这样避让下去。
要解决这种活锁问题,需要在重试机制中引入随机性。例如在网络上发送数据包,如果检测到冲突,都要停止并在一段时间后重发,如果都在 1 秒后重发,还是会冲突,所以引入随机性可以解决该类问题。
饥饿
饥饿是指一个可运行的进程尽管能继续执行,但被调度器无限期地忽视,而不能被调度执行的情况。
与死锁不同的是,饥饿锁在一段时间内,优先级低的线程最终还是会执行的,比如高优先级的线程执行完之后释放了资源。
活锁与饥饿是无关的,因为在活锁中,所有并发进程都是相同的,并且没有完成工作。更广泛地说,饥饿通常意味着有一个或多个贪婪的并发进程,它们不公平地阻止一个或多个并发进程,以尽可能有效地完成工作,或者阻止全部并发进程。
总结
不适用锁肯定会出问题。如果用了,虽然解了前面的问题,但是又出现了更多的新问题。
- 死锁:是因为错误的使用了锁,导致异常;
- 活锁:是饥饿的一种特殊情况,逻辑上感觉对,程序也一直在正常的跑,但就是效率低,逻辑上进行不下去;
- 饥饿:与锁使用的粒度有关,通过计数取样,可以判断进程的工作效率。
只要有共享资源的访问,必定要使其逻辑上进行顺序化和原子化,确保访问一致,这绕不开锁这个概念。
# 6.6.2 互斥锁(sync.Mutex) 和 读写互斥锁(sync.RWMutex)
Go语言包中的 sync 包提供了两种锁类型:sync.Mutex 和 sync.RWMutex。
sync.Mutex
Mutex 用于提供一种加锁机制(Locking Mechanism),保证同一时刻只有一个goroutine在临界区运行。每个资源都对应于一个可称为"互斥锁"的标记, 这个标记用来保证在任意时刻, 只能有一个协程(线程)访问该资源, 其它的协程只能等待。互斥锁是传统并发编程对共享资源进行访问控制的主要手段, 它由标准库
sync
中的Mutex
结构体类型表示。sync.Mutex
类型只有两个公开的指针方法, Lock 和 Unlock。 Lock锁定当前的共享资源, Unlock进行解锁。互斥锁定义如下:
var mutex sync.Mutex;
我们先看下面代码:
func main() { x := 0 for i := 0; i < 30; i++ { go func() { x = x + 1 }() } time.Sleep(2 * time.Second) fmt.Println(x) }
上面代码执行后,大部分情况下它都会打印30,但也可能出现很多不同的值,这是为什么呢? 但是由于有30个 goroutine 对
x
这个变量进行操作,可能会存在 goroutine1 执行完后x加1,但goroutine2拿到的x却是未加1的值,这样就造成了操作产生错误的数据。我们利用
sync.Mutex
来解决这类问题,优化后的代码如下:import ( "fmt" "sync" "time" ) func main() { var mutex sync.Mutex x := 0 for i := 0; i < 30; i++ { go func() { mutex.Lock() //加锁 x = x + 1 mutex.Unlock() //解锁 }() } time.Sleep(2 * time.Second) fmt.Println(x) }
经过优化的,此程序执行结果最终会都是 30 .
sync.RWMutex
互斥锁的本质是当一个goroutine访问的时候, 其它goroutine都不能访问. 这样在资源同步, 避免竞争的同时, 也降低了程序的并发性能, 程序由原来的并行执行变成了串行执行。其实, 当我们对一个不会变化的数据只做读操作的话, 是不存在资源竞争的问题的. 因为数据是不变的, 不管怎么读取, 多少goroutine同时读取, 都是可以的。所以问题不是出在读上, 主要是修改, 也就是写. 修改的数据要同步, 这样其它goroutine才可以感知到. 所以真正的互斥应该是读取和修改、修改和修改之间, 读和读是没有互斥操作的必要的.因此, 衍生出另外一种锁, 叫做读写锁。读写锁可以让多个读操作并发, 同时读取, 但是对于写操作是完全互斥的. 也就是说, 当一个goroutine进行写操作的时候, 其它goroutine既不能进行读操作, 也不能进行写操作.
Go中的读写锁由结构体类型
sync.RWMutex
表示. 此类型的方法集合中包含两对方法: RLock 和 RUnlock。代码示例:
var ( // 逻辑中使用的某个变量 count int // 与变量对应的使用互斥锁 countGuard sync.RWMutex ) func GetCount() int { // 锁定 countGuard.RLock() // 在函数退出时解除锁定 defer countGuard.RUnlock() return count }
# 6.6.3 sync.Map
Go语言中的 map 在并发情况下,只读是线程安全的,同时读写是线程不安全的。
下面来看下并发情况下读写 map 时会出现的问题,代码如下:
func main() {
var m map[int]int = map[int]int{}
go func() {
for {
m[1] = 1
}
}()
go func() {
for {
_ = m[1]
}
}()
time.Sleep(30 * time.Second)
}
运行代码会报错,输出如下:
fatal error: concurrent map read and map write
错误信息显示,并发的 map 读和 map 写,也就是说使用了两个并发函数不断地对 map 进行读和写而发生了竞态问题,map 内部会对这种并发操作进行检查并提前发现。
需要并发读写时,一般的做法是加锁,但这样性能并不高,我们加锁结果看看:
func main() {
var mutex sync.RWMutex
var m map[int]int = map[int]int{}
go func() {
for {
mutex.Lock()
m[1] = 1
mutex.RLock()
}
}()
go func() {
for {
mutex.RLock()
_ = m[1]
mutex.RUnlock()
}
}()
time.Sleep(30 * time.Second)
}
Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包下的特殊结构。
sync.Map 有以下特性:
- 无须初始化,直接声明即可。
- sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Load 表示获取,Delete 表示删除。
- 使用 Range 配合一个回调函数进行遍历操作,通过回调函数返回内部遍历出来的值,Range 参数中回调函数的返回值在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。
并发安全的 sync.Map 演示代码如下:
import (
"fmt"
"sync"
)
func main() {
var scene sync.Map
// 将键值对保存到sync.Map
scene.Store("greece", 97)
scene.Store("london", 100)
scene.Store("egypt", 200)
// 从sync.Map中根据键取值
fmt.Println(scene.Load("london"))
// 根据键删除对应的键值对
scene.Delete("london")
// 遍历所有sync.Map中的键值对
scene.Range(func(k, v interface{}) bool {
fmt.Println("iterate:", k, v)
return true
})
}
代码输出如下:
100 true
iterate: egypt 200
iterate: greece 97
sync.Map 没有提供获取 map 数量的方法,替代方法是在获取 sync.Map 时遍历自行计算数量,sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。