在前面的示例中,我们使用显式锁定互斥体来同步对多个goroutine的共享状态的访问。 另一个选项是使用goroutine
和通道的内置同步功能来实现相同的结果。这种基于通道的方法与Go的共享内存的想法一致,通过沟通,拥有每个数据的goroutine
恰好只有1
个。
在这个例子中,状态将由单个goroutine
拥有。这将保证数据不会因并发访问而损坏。为了读或写状态,其他goroutine
将发送消息到拥有的goroutine
并接收相应的回复。这些readOp
和writeOp
结构封装了这些请求,并拥有一个goroutine
响应的方法。
和以前一样,我们将计算执行的操作数。
读写通道将被其他goroutine
分别用来发出读和写请求。
这里是拥有状态的goroutine
,它是一个如前面示例中的映射,但现在对状态goroutine
是私有的。这个goroutine
在读取和写入通道时重复选择,在请求到达时响应请求。 通过首先执行所请求的操作,然后在响应信道上发送值以指示成功(以及在读取的情况下的期望值)来执行响应。
这里启动了100
个goroutine
来通过读取通道向状态拥有的goroutine
发出读取。每次读取都需要构造一个readOp
,通过读取通道发送readOp
,并通过提供的resp
通道接收结果。
也使用类似的方法开始10
个写操作。让goroutine
工作一秒钟。最后,捕获和报告操作计数。
运行程序显示,基于goroutine
的状态管理示例程序,完成了大约80,000
次操作。
对于这种特殊情况,基于goroutine
的方法比基于互斥的方法更多一些。它在某些情况下可能是有用的,例如,当有其他通道涉及或管理多个此类互斥体将容易出错。应该使用最自然的方法,有助于理解程序。
所有的示例代码,都放在
F:\worksp\golang
目录下。安装Go编程环境请参考:http://www.yiibai.com/go/go_environment.html
stateful-goroutines.go
的完整代码如下所示 -
package main
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
)
// In this example our state will be owned by a single
// goroutine. This will guarantee that the data is never
// corrupted with concurrent access. In order to read or
// write that state, other goroutines will send messages
// to the owning goroutine and receive corresponding
// replies. These `readOp` and `writeOp` `struct`s
// encapsulate those requests and a way for the owning
// goroutine to respond.
type readOp struct {
key int
resp chan int
}
type writeOp struct {
key int
val int
resp chan bool
}
func main() {
// As before we'll count how many operations we perform.
var readOps uint64 = 0
var writeOps uint64 = 0
// The `reads` and `writes` channels will be used by
// other goroutines to issue read and write requests,
// respectively.
reads := make(chan *readOp)
writes := make(chan *writeOp)
// Here is the goroutine that owns the `state`, which
// is a map as in the previous example but now private
// to the stateful goroutine. This goroutine repeatedly
// selects on the `reads` and `writes` channels,
// responding to requests as they arrive. A response
// is executed by first performing the requested
// operation and then sending a value on the response
// channel `resp` to indicate success (and the desired
// value in the case of `reads`).
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
write.resp <- true
}
}
}()
// This starts 100 goroutines to issue reads to the
// state-owning goroutine via the `reads` channel.
// Each read requires constructing a `readOp`, sending
// it over the `reads` channel, and the receiving the
// result over the provided `resp` channel.
for r := 0; r < 100; r++ {
go func() {
for {
read := &readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
// We start 10 writes as well, using a similar
// approach.
for w := 0; w < 10; w++ {
go func() {
for {
write := &writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
// Let the goroutines work for a second.
time.Sleep(time.Second)
// Finally, capture and report the op counts.
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
}
执行上面代码,将得到以下输出结果 -
F:\worksp\golang>go run mutexes.go
readOps: 84546
writeOps: 8473
state: map[0:99 3:3 4:62 1:18 2:89]