CSP: “顺序通信进程”(communicating sequential processes)或被简称为CSP。 CSP是一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递。

一、Goroutines

  1. 定义:在GO中,每个并发的执行单元被称为一个goroutine
  2. go语句: 在函数或方法前加上关键字go,就形成了go语句。go语句使函数或方法在新建的goroutine中执行。
  3. 程序启动时,主函数(main函数)会在main goroutine中执行。当main goroutine退出时,所有其它goroutine都会退出。
    f() // call f(); wait for it to return
    go f() // create a new goroutine that calls f(); don't wait
    func main() {
        l, err := net.Listen("tcp", "192.168.0.1:8000")
        if err != nil {
            log.Fatal(err)
        }

        for {
            conn, err := l.Accept()
            if err != nil {
                log.Print(err)
                continue
            }

            go handleConn(conn)
        }
    }

二、Channels

  1. channel 是 goroutines之间通信的机制
  2. 通过make来创建一个channel
  3. 与map类似,channel也是能底层数据结构的引用,作为函数参数传递时,是传递的引用。
  4. channel的零值为nil。
  5. channel之间可以用 == 进行比较。
  6. 一个channel有发送和接收两个操作,都是通信行为。
  7. 发送将一个值从一个goroutine发送到另一个接收该值的goroutine。
  8. 发送和接收都使用 <- 运算符号,发送时,<-分割channel和要发送的值。接收时,<-放在channel之前。
  9. 在关闭的channel上发送数据会引起panic, 但可以接收数据,如果被关闭的channel中没有数据,则得到零值。
    ch <- x     // a send statement
    y := <-ch   // a receive expression in an assignment statement
    <-ch        // a receive statement; result is discarded

    close(ch)   // close channel

三、不带缓冲的channel

    ch = make(chan int) // unbuffered channel
    ch = make(chan int, 0) // unbuffered channel
    ch = make(chan int, 3) // buffered channel with capacity 3
  1. 在无缓存的channel上进行收发操作,将会导致此goroutine阻塞,直到有另一个goroutine对该通道进行发收操作。
  2. 无缓存的channel上的收发,导致两个goroutine进行一次同步操作。所以,无缓存channel也叫同步channel。
  3. 当我们说x事件既不是在y事件之前发生也不是在y事件之后发生,我们就说x事件和y事件是并发的。
    func main() {
        conn, err := net.Dial("tcp", "localhost:8000")
        if err != nil {
            log.Fatal(err)
        }

        done := make(chan struct{})
        go func() {
            io.Copy(os.Stdout, conn) // NOTE: ignoring errors
            log.Println("done")
            done <- struct{}{} // signal the main goroutine
        }()
        mustCopy(conn, os.Stdin)
        conn.Close()
        <-done // wait for background goroutine to finish
    }

四、串联的 Channels (Pipeline)

  1. 一个 channels 的输出作为另一个 channels 的输入。这种串联就是管道(pipeline)。
  2. 适时关闭channel,可以使接收都得到通知。
  3. 重复关闭一个chan会导致panic
  4. 关闭值为nil的chan会导致panic
  5. 关闭一个chan会触发广播机制
    func main() {
        naturals := make(chan int)
        squares := make(chan int)

        go func() {
            for x := 0; ; x++ {
                naturals <- x
            }
        }()

        go func() {
            for {
                x := <-naturals
                squares <- x * x
            }
        }()

        for {
            fmt.Println(<-squares)
        }
    }

    // 有限个数的x
    func main() {
        naturals := make(chan int)
        squares := make(chan int)

        go func() {
            for x := 0; x < 10000; x++ {
                naturals <- x
            }

            close(naturals)
        }()

        go func() {
            for x := range naturals {
                squares <- x*x
            }

            close(squares)
        }()

        for x := range squares {
            fmt.Println(x)
        }
    }

五、单方向的 Channel

  1. chan<- type, 表示只发送type类型数据的单向channel
  2. <-chan type, 表示只接收type类型数据的单向channel
  3. 没有将单向型chan转换为双向chan的语法
    func main() {
        naturals := make(chan int)
        squares := make(chan int)

        go counter(naturals)
        go squarer(naturals, squares)
        printer(squares)
    }

    func counter(out chan<- int) {
        for x := 0; x < 10000; x++ {
            out <- x
        }

        close(out)
    }

    func squarer(in <-chan int, out chan<- int) {
        for x := range in {
            out <- x*x
        }

        close(out)
    }

    func printer(in <-chan int) {
        for x := range in {
            fmt.Println(x)
        }
    }

六、带缓存的 Channels

  1. 带缓存的 channel 内部持有一个元素队列。
  2. 当队列为空时,接收方阻塞。当队列满时,发送方阻塞。
  3. 禁止在一个 goroutine 中把带缓存的 channel 当简单队列使用。这时,要用 slice 。
    ch := make(chan string, 3)
    ch <- "A"
    ch <- "B"
    ch <- "C"

    //block
    ch <- "D"

    fmt.Println(<-ch)
    // capacity
    fmt.Println(cap(ch))    // 3
    // size
    fmt.Println(len(ch))    // 3

    package cake

    import (
        "fmt"
        "math/rand"
        "time"
    )

    type Shop struct {
        Verbose        bool
        Cakes          int           // number of cakes to bake
        BakeTime       time.Duration // time to bake one cake
        BakeStdDev     time.Duration // standard deviation of baking time
        BakeBuf        int           // buffer slots between baking and icing
        NumIcers       int           // number of cooks doing icing
        IceTime        time.Duration // time to ice one cake
        IceStdDev      time.Duration // standard deviation of icing time
        IceBuf         int           // buffer slots between icing and inscribing
        InscribeTime   time.Duration // time to inscribe one cake
        InscribeStdDev time.Duration // standard deviation of inscribing time
    }

    type cake int

    func (s *Shop) baker(baked chan<- cake) {
        for i := 0; i < s.Cakes; i++ {
            c := cake(i)
            if s.Verbose {
                fmt.Println("baking", c)
            }
            work(s.BakeTime, s.BakeStdDev)
            baked <- c
        }
        close(baked)
    }

    func (s *Shop) icer(iced chan<- cake, baked <-chan cake) {
        for c := range baked {
            if s.Verbose {
                fmt.Println("icing", c)
            }
            work(s.IceTime, s.IceStdDev)
            iced <- c
        }
    }

    func (s *Shop) inscriber(iced <-chan cake) {
        for i := 0; i < s.Cakes; i++ {
            c := <-iced
            if s.Verbose {
                fmt.Println("inscribing", c)
            }
            work(s.InscribeTime, s.InscribeStdDev)
            if s.Verbose {
                fmt.Println("finished", c)
            }
        }
    }

    // Work runs the simulation 'runs' times.
    func (s *Shop) Work(runs int) {
        for run := 0; run < runs; run++ {
            baked := make(chan cake, s.BakeBuf)
            iced := make(chan cake, s.IceBuf)
            go s.baker(baked)
            for i := 0; i < s.NumIcers; i++ {
                go s.icer(iced, baked)
            }
            s.inscriber(iced)
        }
    }

    // work blocks the calling goroutine for a period of time
    // that is normally distributed around d
    // with a standard deviation of stddev.
    func work(d, stddev time.Duration) {
        delay := d + time.Duration(rand.NormFloat64()*float64(stddev))
        time.Sleep(delay)
    }

七、并发的循环

  1. 匿名函数中的循环变量快照问题
    // makeThumbnails3 makes thumbnails of the specified files in parallel.
    func makeThumbnails3(filenames []string) {
        ch := make(chan struct{})
        for _, f := range filenames {
            go func(f string) {
                thumbnail.ImageFile(f) // NOTE: ignoring errors
                ch <- struct{}{}
            }(f)
        }
        // Wait for goroutines to complete.
        for range filenames {
            <-ch
        }
    }

八、基于select的多路复用

  1. 多路复用格式
    select {
    case <-ch1:
        // ...
    case x := <-ch2:
        // ...use x...
    case ch3 <- y:
        // ...
    default:
        // ...
    }
//模拟火箭发射情景
    func main() {
        abort := make(chan struct{})

        go func() {
            os.Stdin.Read(make([]byte, 1))
            abort <- struct{}{}
        }()

        fmt.Println("Commencing countdown, press enter to abort.")
        select {
        case <-time.After(10 * time.Second):
            // Do nothing
        case <-abort:
            fmt.Println("launch abort!")
            return
        }

        launch()
    }
  1. 一个没有任何 case 的 select 语句,会永远等待下去
    select {
    }
  1. 对 nil 的 channel 进行发送或接收操作会永远阻塞。
  2. 在 select 语句中,操作 nil 的 channel 永远都不会被 select 到。

九、并发的退出

通过关闭channel来对所有goroutine进行广播

    //!+1
    var done = make(chan struct{})

    func cancelled() bool {
        select {
        case <-done:
            return true
        default:
            return false
        }
    }

    //!-1

    func main() {
        // Determine the initial directories.
        roots := os.Args[1:]
        if len(roots) == 0 {
            roots = []string{"."}
        }

        //!+2
        // Cancel traversal when input is detected.
        go func() {
            os.Stdin.Read(make([]byte, 1)) // read a single byte
            close(done)
        }()
        //!-2

        // Traverse each root of the file tree in parallel.
        fileSizes := make(chan int64)
        var n sync.WaitGroup
        for _, root := range roots {
            n.Add(1)
            go walkDir(root, &n, fileSizes)
        }
        go func() {
            n.Wait()
            close(fileSizes)
        }()

        // Print the results periodically.
        tick := time.Tick(500 * time.Millisecond)
        var nfiles, nbytes int64
    loop:
        //!+3
        for {
            select {
            case <-done:
                // Drain fileSizes to allow existing goroutines to finish.
                for range fileSizes {
                    // Do nothing.
                }
                return
            case size, ok := <-fileSizes:
                // ...
                //!-3
                if !ok {
                    break loop // fileSizes was closed
                }
                nfiles++
                nbytes += size
            case <-tick:
                printDiskUsage(nfiles, nbytes)
            }
        }
        printDiskUsage(nfiles, nbytes) // final totals
    }

    func printDiskUsage(nfiles, nbytes int64) {
        fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
    }

    // walkDir recursively walks the file tree rooted at dir
    // and sends the size of each found file on fileSizes.
    //!+4
    func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
        defer n.Done()
        if cancelled() {
            return
        }
        for _, entry := range dirents(dir) {
            // ...
            //!-4
            if entry.IsDir() {
                n.Add(1)
                subdir := filepath.Join(dir, entry.Name())
                go walkDir(subdir, n, fileSizes)
            } else {
                fileSizes <- entry.Size()
            }
            //!+4
        }
    }

    //!-4

    var sema = make(chan struct{}, 20) // concurrency-limiting counting semaphore

    // dirents returns the entries of directory dir.
    //!+5
    func dirents(dir string) []os.FileInfo {
        select {
        case sema <- struct{}{}: // acquire token
        case <-done:
            return nil // cancelled
        }
        defer func() { <-sema }() // release token

        // ...read directory...
        //!-5

        f, err := os.Open(dir)
        if err != nil {
            fmt.Fprintf(os.Stderr, "du: %v\n", err)
            return nil
        }
        defer f.Close()

        entries, err := f.Readdir(0) // 0 => no limit; read all entries
        if err != nil {
            fmt.Fprintf(os.Stderr, "du: %v\n", err)
            // Don't return: Readdir may return partial results.
        }
        return entries
    }