Goroutines和Channels
CSP: “顺序通信进程”(communicating sequential processes)或被简称为CSP。 CSP是一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递。
一、Goroutines
- 定义:在GO中,每个并发的执行单元被称为一个goroutine
- go语句: 在函数或方法前加上关键字go,就形成了go语句。go语句使函数或方法在新建的goroutine中执行。
- 程序启动时,主函数(main函数)会在main goroutine中执行。当main goroutine退出时,所有其它goroutine都会退出。
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
func main() {
l, err := net.Listen("tcp", "192.168.0.1:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := l.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
二、Channels
- channel 是 goroutines之间通信的机制
- 通过make来创建一个channel
- 与map类似,channel也是能底层数据结构的引用,作为函数参数传递时,是传递的引用。
- channel的零值为nil。
- channel之间可以用 == 进行比较。
- 一个channel有发送和接收两个操作,都是通信行为。
- 发送将一个值从一个goroutine发送到另一个接收该值的goroutine。
- 发送和接收都使用 <- 运算符号,发送时,<-分割channel和要发送的值。接收时,<-放在channel之前。
- 在关闭的channel上发送数据会引起panic, 但可以接收数据,如果被关闭的channel中没有数据,则得到零值。
ch <- x // a send statement
y := <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
close(ch) // close channel
三、不带缓冲的channel
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
- 在无缓存的channel上进行收发操作,将会导致此goroutine阻塞,直到有另一个goroutine对该通道进行发收操作。
- 无缓存的channel上的收发,导致两个goroutine进行一次同步操作。所以,无缓存channel也叫同步channel。
- 当我们说x事件既不是在y事件之前发生也不是在y事件之后发生,我们就说x事件和y事件是并发的。
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
四、串联的 Channels (Pipeline)
- 一个 channels 的输出作为另一个 channels 的输入。这种串联就是管道(pipeline)。
- 适时关闭channel,可以使接收都得到通知。
- 重复关闭一个chan会导致panic
- 关闭值为nil的chan会导致panic
- 关闭一个chan会触发广播机制
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
for {
fmt.Println(<-squares)
}
}
// 有限个数的x
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 10000; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for x := range naturals {
squares <- x*x
}
close(squares)
}()
for x := range squares {
fmt.Println(x)
}
}
五、单方向的 Channel
- chan<- type, 表示只发送type类型数据的单向channel
- <-chan type, 表示只接收type类型数据的单向channel
- 没有将单向型chan转换为双向chan的语法
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(naturals, squares)
printer(squares)
}
func counter(out chan<- int) {
for x := 0; x < 10000; x++ {
out <- x
}
close(out)
}
func squarer(in <-chan int, out chan<- int) {
for x := range in {
out <- x*x
}
close(out)
}
func printer(in <-chan int) {
for x := range in {
fmt.Println(x)
}
}
六、带缓存的 Channels
- 带缓存的 channel 内部持有一个元素队列。
- 当队列为空时,接收方阻塞。当队列满时,发送方阻塞。
- 禁止在一个 goroutine 中把带缓存的 channel 当简单队列使用。这时,要用 slice 。
ch := make(chan string, 3)
ch <- "A"
ch <- "B"
ch <- "C"
//block
ch <- "D"
fmt.Println(<-ch)
// capacity
fmt.Println(cap(ch)) // 3
// size
fmt.Println(len(ch)) // 3
package cake
import (
"fmt"
"math/rand"
"time"
)
type Shop struct {
Verbose bool
Cakes int // number of cakes to bake
BakeTime time.Duration // time to bake one cake
BakeStdDev time.Duration // standard deviation of baking time
BakeBuf int // buffer slots between baking and icing
NumIcers int // number of cooks doing icing
IceTime time.Duration // time to ice one cake
IceStdDev time.Duration // standard deviation of icing time
IceBuf int // buffer slots between icing and inscribing
InscribeTime time.Duration // time to inscribe one cake
InscribeStdDev time.Duration // standard deviation of inscribing time
}
type cake int
func (s *Shop) baker(baked chan<- cake) {
for i := 0; i < s.Cakes; i++ {
c := cake(i)
if s.Verbose {
fmt.Println("baking", c)
}
work(s.BakeTime, s.BakeStdDev)
baked <- c
}
close(baked)
}
func (s *Shop) icer(iced chan<- cake, baked <-chan cake) {
for c := range baked {
if s.Verbose {
fmt.Println("icing", c)
}
work(s.IceTime, s.IceStdDev)
iced <- c
}
}
func (s *Shop) inscriber(iced <-chan cake) {
for i := 0; i < s.Cakes; i++ {
c := <-iced
if s.Verbose {
fmt.Println("inscribing", c)
}
work(s.InscribeTime, s.InscribeStdDev)
if s.Verbose {
fmt.Println("finished", c)
}
}
}
// Work runs the simulation 'runs' times.
func (s *Shop) Work(runs int) {
for run := 0; run < runs; run++ {
baked := make(chan cake, s.BakeBuf)
iced := make(chan cake, s.IceBuf)
go s.baker(baked)
for i := 0; i < s.NumIcers; i++ {
go s.icer(iced, baked)
}
s.inscriber(iced)
}
}
// work blocks the calling goroutine for a period of time
// that is normally distributed around d
// with a standard deviation of stddev.
func work(d, stddev time.Duration) {
delay := d + time.Duration(rand.NormFloat64()*float64(stddev))
time.Sleep(delay)
}
七、并发的循环
- 匿名函数中的循环变量快照问题
// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
ch := make(chan struct{})
for _, f := range filenames {
go func(f string) {
thumbnail.ImageFile(f) // NOTE: ignoring errors
ch <- struct{}{}
}(f)
}
// Wait for goroutines to complete.
for range filenames {
<-ch
}
}
八、基于select的多路复用
- 多路复用格式
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
//模拟火箭发射情景
func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
fmt.Println("Commencing countdown, press enter to abort.")
select {
case <-time.After(10 * time.Second):
// Do nothing
case <-abort:
fmt.Println("launch abort!")
return
}
launch()
}
- 一个没有任何 case 的 select 语句,会永远等待下去
select {
}
- 对 nil 的 channel 进行发送或接收操作会永远阻塞。
- 在 select 语句中,操作 nil 的 channel 永远都不会被 select 到。
九、并发的退出
通过关闭channel来对所有goroutine进行广播
//!+1
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
//!-1
func main() {
// Determine the initial directories.
roots := os.Args[1:]
if len(roots) == 0 {
roots = []string{"."}
}
//!+2
// Cancel traversal when input is detected.
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done)
}()
//!-2
// Traverse each root of the file tree in parallel.
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
// Print the results periodically.
tick := time.Tick(500 * time.Millisecond)
var nfiles, nbytes int64
loop:
//!+3
for {
select {
case <-done:
// Drain fileSizes to allow existing goroutines to finish.
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
// ...
//!-3
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
//!+4
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
// ...
//!-4
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
//!+4
}
}
//!-4
var sema = make(chan struct{}, 20) // concurrency-limiting counting semaphore
// dirents returns the entries of directory dir.
//!+5
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
//!-5
f, err := os.Open(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
defer f.Close()
entries, err := f.Readdir(0) // 0 => no limit; read all entries
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
// Don't return: Readdir may return partial results.
}
return entries
}