H2O生成——屏障
H2O生成是只有同时存在且仅存在的情况下两个氢原子和一个氧原子,才能生成下一个氢原子或者氧原子,如此循环往复
控制H、O顺序
channel版——O等H
如果只有两个goroutine的情况下,那么下面是可行的
H两次通过先是O授予的,而后由第一个H授予,第二个H则授予O通过
func TestChannel(t *testing.T) {
count := 100
hch := make(chan int, 1)
och := make(chan int)
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
for i := 0; i < 2*count; i++ {
HCh(hch, och)
}
wg.Done()
}()
go func() {
for i := 0; i < count; i++ {
OCh(hch, och)
}
wg.Done()
}()
wg.Wait()
}
var hInnerCh = make(chan int, 1)
func HCh(hch, och chan int) {
<-hch
fmt.Println("H")
select {
case hInnerCh <- 1:
hch <- 1
case <-hInnerCh:
och <- 1
}
}
func OCh(hch, och chan int) {
fmt.Println("O")
hch <- 1
<-och
}
然而如果多个goroutine同时生成H2O就会出现问题,会连续出现两个O或者4个H,这是因为O生成的时候没有屏障,所以会导致连续的O被生成了,继而导致H被连续生产
可以做一些小小的改变,就是生成顺序变成HHO,让O先堵塞,H去授予O通过
func TestChannelMultiGoroutines(t *testing.T) {
count := 100
hch := make(chan int, 1)
och := make(chan int)
hch <- 1
for i := 0; i < count; i++ {
go OChMul(hch, och)
go HChMul(hch, och)
go HChMul(hch, och)
}
time.Sleep(3 * time.Second)
}
var hInnerCh = make(chan int, 1)
func HChMul(hch, och chan int) {
<-hch
fmt.Println("H")
select {
case hInnerCh <- 1:
hch <- 1
case <-hInnerCh:
och <- 1
}
}
func OChMul(hch, och chan int) {
<-och
fmt.Println("O")
hch <- 1
}
O两次授予通过——仅用于H、O各一goroutine
此外让O必须授予两次通过才能打印,在H、O都分别有且仅有一个goroutine的情况下,也是可以的
在多goroutine时由于H授予O的通过可能分别给了两个O的goroutine,导致一直堵塞
func TestChannelTwoPassForO(t *testing.T) {
count := 100
hch := make(chan int, 2)
och := make(chan int, 2)
hch <- 1
hch <- 1
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
for i := 0; i < 2*count; i++ {
HChTwoPassForO(hch, och)
}
wg.Done()
}()
go func() {
for i := 0; i < count; i++ {
OChTwoPassForO(hch, och)
}
wg.Done()
}()
wg.Wait()
}
func HChTwoPassForO(hch, och chan int) {
<-hch
fmt.Println("H")
och <- 1
}
func OChTwoPassForO(hch, och chan int) {
<-och
<-och
fmt.Println("O")
hch <- 1
hch <- 1
}
CAS版——保障print也是原子的
以下是在H、O都仅有一个goroutine情况下进行的,通过两个cas中间print来保障print是原子的,也就是不要出现cas状态转移成功了,然后被另一个goroutine抢先print就会出现不一致状况
func TestCASStateTransition(t *testing.T) {
count := 100
wg := &sync.WaitGroup{}
wg.Add(2)
v := &atomic.Value{}
v.Store(0)
go func() {
for i := 0; i < 2*count; i++ {
HCAS(v)
}
wg.Done()
}()
go func() {
for i := 0; i < count; i++ {
OCAS(v)
}
wg.Done()
}()
wg.Wait()
}
func HCAS(v *atomic.Value) {
for {
for v.CompareAndSwap(2, 1) {
fmt.Println("H")
v.CompareAndSwap(1, 1)
return
}
for v.CompareAndSwap(1, 1) {
fmt.Println("H")
v.CompareAndSwap(1, 0)
return
}
}
}
func OCAS(v *atomic.Value) {
for !v.CompareAndSwap(0, 3) {
}
fmt.Println("O")
for !v.CompareAndSwap(3, 2) {
}
}
不过以上代码若H goroutine增多的情况下就不可靠了
这是因为多个goroutine可能都同时cas从1到1,所以只要让H的范围区间够大,保证不存在多个H goroutine同时cas一个都能成功
func TestCASStateTransitionMul(t *testing.T) {
count := 100
v := &atomic.Value{}
v.Store(0)
for i := 0; i < count; i++ {
go HCASMul(v)
go HCASMul(v)
go OCASMul(v)
}
time.Sleep(3*time.Second)
}
func HCASMul(v *atomic.Value) {
for {
for v.CompareAndSwap(4, 3) {
fmt.Println("H")
v.CompareAndSwap(3, 2)
return
}
for v.CompareAndSwap(2, 1) {
fmt.Println("H")
v.CompareAndSwap(1, 0)
return
}
}
}
func OCASMul(v *atomic.Value) {
for !v.CompareAndSwap(0, 5) {
}
fmt.Println("O")
for !v.CompareAndSwap(5, 4) {
}
}
Semaphore版——信号量控制资源
h2o生成这样的问题,天生适合信号量。h生成只要一个h通过资源,并会授予一个o通过资源,o生成需要两个o通过资源,并会释放两个h通过资源
func TestSem(t *testing.T) {
count := 100
hsem := semaphore.NewWeighted(2)
osem := semaphore.NewWeighted(2)
osem.Acquire(context.Background(), 2)
for i := 0; i < count; i++ {
go HSem(hsem, osem)
go HSem(hsem, osem)
go OSem(hsem, osem)
}
time.Sleep(3 * time.Second)
}
func HSem(hsem, osem *semaphore.Weighted) {
hsem.Acquire(context.Background(), 1)
fmt.Println("H")
osem.Release(1)
}
func OSem(hsem, osem *semaphore.Weighted) {
osem.Acquire(context.Background(), 2)
fmt.Println("O")
hsem.Release(2)
}
Cond版——条件变量控制资源
通过一个资源状态变量condState记录可用资源数量,O仅在condState为0,才供给为2,H仅在condState大于0时进行消费
func TestCond(t *testing.T) {
count := 100
wg := &sync.WaitGroup{}
wg.Add(3)
mu := &sync.Mutex{}
hc := sync.NewCond(mu)
oc := sync.NewCond(mu)
go func() {
for i := 0; i < count; i++ {
HCond(hc, oc)
}
wg.Done()
}()
go func() {
for i := 0; i < count; i++ {
HCond(hc, oc)
}
wg.Done()
}()
go func() {
for i := 0; i < count; i++ {
OCond(hc, oc)
}
wg.Done()
}()
wg.Wait()
}
var condState int
func HCond(hc, oc *sync.Cond) {
hc.L.Lock()
for condState == 0 {
hc.Wait()
}
fmt.Println("H")
condState--
oc.Broadcast()
hc.L.Unlock()
}
func OCond(hc, oc *sync.Cond) {
oc.L.Lock()
for condState > 0 {
oc.Wait()
}
fmt.Println("O")
condState = 2
hc.Signal()
oc.L.Unlock()
}
Mutex版——仿Cond
与Cond版类似,故不再赘述
func TestMutexMul(t *testing.T) {
count := 100
mu := &sync.Mutex{}
for i := 0; i < count; i++ {
go HMutex(mu)
go HMutex(mu)
go OMutex(mu)
}
time.Sleep(3 * time.Second)
}
var muState int
func HMutex(mu *sync.Mutex) {
mu.Lock()
for muState == 0 {
mu.Unlock()
mu.Lock()
}
fmt.Println("H")
muState--
mu.Unlock()
}
func OMutex(mu *sync.Mutex) {
mu.Lock()
for muState > 0 {
mu.Unlock()
mu.Lock()
}
fmt.Println("O")
muState = 2
mu.Unlock()
}
允许H同时执行
上面的都是严格控制H、O的顺序,比如一定是OHH或者HHO的顺序,可是H是可允许同时进行的,那么能不能设计这样的算法使得H能够同时进行
以下仅尝试采用golang特色结构channel来完成
channel+cas版——保障资源状态并发安全
采用了cas来保障资源状态并发安全,并通过channel来沟通、控制
保障资源状态并发安全不仅有cas,还可以加锁,甚至用额外channel来保障也是可以的
此外,对于上面的channel版,如果第一个H在执行实际生产过程之前就先授予了第二个H通过,那么真正串行的时间其实并不多,也可以认为是H同时执行的
func TestChannelCASMultiGoroutinesHC(t *testing.T) {
count := 100
hch := make(chan int, 2)
och := make(chan int, 1)
och <- 1
var state int32
for i := 0; i < count; i++ {
go OChHC(hch, och, &state)
go HChHC(hch, och, &state)
go HChHC(hch, och, &state)
}
time.Sleep(3 * time.Second)
}
func HChHC(hch, och chan int, state *int32) {
<-hch
fmt.Println("H")
atomic.AddInt32(state, -1)
if atomic.LoadInt32(state) == 0 {
och <- 1
}
}
func OChHC(hch, och chan int, state *int32) {
<-och
fmt.Println("O")
atomic.AddInt32(state, 2)
hch <- 1
hch <- 1
}
纯channel版——H内部状态控制+O两步通过
上面的方法是channel结合cas才达成目标,那能不能只用channel就达到目标呢?
回到控制H、O顺序中O两次授予通过方法,本身已经能允许H同时执行了。但由于H授予的两次通过可能会被O goroutine均摊,所以在多goroutine下确实无法预期输出。
那么是否存在方法保障第二次通过永远只能被第一次通过的O goroutine获取到呢?
此时可以为O引入两次不同channel och0、och1获取,第一个H输出完成授予och0通过,第二个输出完成授予och1通过,由于och1仅能在och0已经通过的情况下才会尝试去获取,因此不会发生均摊现象
func TestPureChannelMultiGoroutinesHC(t *testing.T) {
count := 100
hch := make(chan int, 2)
och0 := make(chan int, 1)
och1 := make(chan int, 1)
hch <- 1
hch <- 1
for i := 0; i < count; i++ {
go OPureChHC(hch, och0, och1)
go HPureChHC(hch, och0, och1)
go HPureChHC(hch, och0, och1)
}
time.Sleep(3 * time.Second)
}
var hStateCh = make(chan int, 1)
func HPureChHC(hch, och0, och1 chan int) {
<-hch
fmt.Println("H")
select {
case hStateCh <- 1:
och0 <- 1
case <-hStateCh:
och1 <- 1
}
}
func OPureChHC(hch, och0, och1 chan int) {
<-och0
<-och1
fmt.Println("O")
hch <- 1
hch <- 1
}