操作行为
channel 的操作行为结果总结如下:
| 操作 | nil channel | closed channel | not-closed non-nil channel |
|---|---|---|---|
| close | panic | panic | 成功close |
| 写 ch<- | 一直阻塞 | panic | 阻塞或成功写入数据 |
| 读<- | 一直阻塞 | 读取对应类型零值 | 阻塞或成功写入数据 |
读取一个已关闭的 channel 时,总是能读取到对应类型的零值,为了和读取非空未关闭 channel 的行为区别,可以使用两个接收值:
// ok is false when ch is closed
v, ok := <-ch
golang 中大部分类型都是值类型(只有 slice / channel / map 是引用类型),读/写类型是值类型的 channel 时,如果元素 size 比较大时,应该使用指针代替,避免频繁的内存拷贝开销。
多路复用(select)
Select 语句是阻塞的,除非它有默认情况。一旦其中一个条件满足,它就会解除阻塞。
如果所有的 case 语句 ( channel 操作 ) 都阻塞了,那么 select 语句将等待其中一个 case 语句 ( 其 channel 操作 ) 解除阻塞,然后执行该 case。如果一些或所有的 channel 操作是非阻塞的,那么将随机选择一个非阻塞 case 并立即执行。
default case 是非阻塞的。但这还不是全部,default case 使得默认情况下 select 语句总是非阻塞的。这意味着,在任何 channel (有缓冲或无缓冲) 上的发送和接收操作总是非阻塞的。
使用场景
futures / promises
golang 虽然没有直接提供 futrue / promise 模型的操作原语,但通过 goroutine 和 channel 可以实现类似的功能:
package main
import (
"io/ioutil"
"log"
"net/http"
)
// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {
c := make(chan []byte, 1)
go func() {
var body []byte
defer func() {
c <- body
}()
res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()
body, _ = ioutil.ReadAll(res.Body)
}()
return c
}
func main() {
future := RequestFuture("https://api.github.com/users/octocat/orgs")
body := <-future
log.Printf("reponse length: %d", len(body))
}
条件变量 (condition variable)
类型于 POSIX 接口中线程通知其他线程某个事件发生的条件变量,channel 的特性也可以用来当成协程之间同步的条件变量。因为 channel 只是用来通知,所以 channel 中具体的数据类型和值并不重要,这种场景一般用 strct {} 作为 channel 的类型。
一对一通知
类似 pthread_cond_signal() 的功能,用来在一个协程中通知另个某一个协程事件发生:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
nums := make([]int, 100)
go func() {
time.Sleep(time.Second)
for i := 0; i < len(nums); i++ {
nums[i] = i
}
// send a finish signal
ch <- struct{}{}
}()
// wait for finish signal
<-ch
fmt.Println(nums)
}
广播通知
类似 pthread_cond_broadcast() 的功能。利用从已关闭的 channel 读取数据时总是非阻塞的特性,可以实现在一个协程中向其他多个协程广播某个事件发生的通知:
package main
import (
"fmt"
"time"
)
func main() {
N := 10
exit := make(chan struct{})
done := make(chan struct{}, N)
// start N worker goroutines
for i := 0; i < N; i++ {
go func(n int) {
for {
select {
// wait for exit signal
case <-exit:
fmt.Printf("worker goroutine #%d exit\n", n)
done <- struct{}{}
return
case <-time.After(time.Second):
fmt.Printf("worker goroutine #%d is working...\n", n)
}
}
}(i)
}
time.Sleep(3 * time.Second)
// broadcast exit signal
close(exit)
// wait for all worker goroutines exit
for i := 0; i < N; i++ {
<-done
}
fmt.Println("main goroutine exit")
}
信号量
channel 的读/写相当于信号量的 P / V 操作,下面的示例程序中 channel 相当于信号量:
package main
import (
"log"
"math/rand"
"time"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeConsumer(customerId int) {
log.Print("-> consumer#", customerId, " enters the bar")
seat := <-bar // need a seat to drink
log.Print("consumer#", customerId, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
log.Print("<- consumer#", customerId, " frees seat#", seat)
bar <- seat // free the seat and leave the bar
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10) // the bar has 10 seats
// Place seats in an bar.
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId) // none of the sends will block
}
// a new consumer try to enter the bar for each second
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeConsumer(customerId)
}
}
互斥量
互斥量相当于二元信号里,所以 cap 为 1 的 channel 可以当成互斥量使用:
package main
import "fmt"
func main() {
mutex := make(chan struct{}, 1) // the capacity must be one
counter := 0
increase := func() {
mutex <- struct{}{} // lock
counter++
<-mutex // unlock
}
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}
控制并发数
以爬虫为例,比如需要爬取1w条数据,需要并发爬取以提高效率,但并发量又不能过大,可以通过channel来控制并发规模,比如同时支持5个并发任务:
ch := make(chan int, 5)
for _, url := range urls {
go func() {
ch <- 1
worker(url)
<- ch
}
}
退出goroutine
func exit01() {
done := make(chan bool)
go func() {
for {
select {
case <-done:
fmt.Println("退出携程")
return
default:
fmt.Println("监控中...")
time.Sleep(1 * time.Second)
}
}
}()
time.Sleep(3 * time.Second)
done <- true
time.Sleep(5 * time.Second)
fmt.Println("程序退出")
}
func exit02() {
done :=make(chan bool)
go func() {
for{
select {
case <-done:
fmt.Println("退出携程01")
return
default:
fmt.Println("监控01...")
time.Sleep(1*time.Second)
}
}
}()
go func() {
for res :=range done{ //没有消息阻塞状态,chan关闭 for 循环结束
fmt.Println(res)
}
fmt.Println("退出监控03")
}()
go func() {
for{
select {
case <-done:
fmt.Println("退出携程02")
return
default:
fmt.Println("监控02...")
time.Sleep(1*time.Second)
}
}
}()
time.Sleep(3*time.Second)
close(done)
time.Sleep(5*time.Second)
fmt.Println("退出程序")
}
关闭 channel
关闭不再需要使用的 channel 并不是必须的。跟其他资源比如打开的文件、socket 连接不一样,这类资源使用完后不关闭后会造成句柄泄露,channel 使用完后不关闭也没有关系,channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。golang 也没有直接提供判断 channel 是否已经关闭的接口,虽然可以用其他不太优雅的方式自己实现一个:
func isClosed(ch chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}
不过实现一个这样的接口也没什么必要。因为就算通过 isClosed() 得到当前 channel 当前还未关闭,如果试图往 channel 里写数据,仍然可能会发生 panic ,因为在调用 isClosed() 后,其他协程可能已经把 channel 关闭了。关闭 channel 时应该注意以下准则:
- 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
- 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
- 如果只有一个写入端,可以在这个写入端放心关闭 channel 。
关闭 channel 粗暴一点的做法是随意关闭,如果产生了 panic 就用 recover 避免进程挂掉。稍好一点的方案是使用标准库的 sync 包来做关闭 channel 时的协程同步,不过使用起来也稍微复杂些。下面介绍一种优雅些的做法。
为了避免手动检查 channel 关闭情况带来的痛苦,Go 为我们提供了 for range 循环 ,当 channel 关闭时 for range 将自动关闭。
package main
import "fmt"
func squares(c chan int){
for i := 0; i<=9; i++ {
c <- i* i
}
close(c)
}
func main(){
fmt.Println("main() started")
c := make(chan int)
go squares(c)
for val := range c {
fmt.Println(val)
}
fmt.Println("main() stopped")
}
关闭程序
你可以创建一个chan信道,然后将关闭信号发送到它。在程序的其他部分中,你可以监听这个信道,并在收到关闭信号后结束程序。例如:
shutdownChan := make(chan os.Signal, 1)
signal.Notify(shutdownChan, os.Interrupt, syscall.SIGTERM)
<-shutdownChan
在这个例子中,我们创建了一个chan信道,然后使用**signal.Notify()**函数将操作系统的关闭信号发送到它。在程序的结尾处,我们等待一个信号,并在收到它时退出程序。
一写多读
这种场景下这个唯一的写入端可以关闭 channel 用来通知读取端所有数据都已经写入完成了。读取端只需要用 for range 把 channel 中数据遍历完就可以了,当 channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
send := func() {
for i := 0; i < 100; i++ {
ch <- i
}
// signal sending finish
close(ch)
}
recv := func(id int) {
defer wg.Done()
for i := range ch {
fmt.Printf("receiver #%d get %d\n", id, i)
}
fmt.Printf("receiver #%d exit\n", id)
}
wg.Add(3)
go recv(0)
go recv(1)
go recv(2)
send()
wg.Wait()
}
多写一读
这种场景下虽然可以用 sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func() {
count := 0
for i := range ch {
fmt.Printf("receiver get %d\n", i)
count++
if count >= 1000 {
// signal recving finish
close(done)
return
}
}
}
wg.Add(3)
go send(0)
go send(1)
go send(2)
recv()
wg.Wait()
}
多写多读
这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func(id int) {
defer wg.Done()
for {
select {
case <-done:
// get exit signal
fmt.Printf("receiver #%d exit\n", id)
return
case i := <-ch:
fmt.Printf("receiver #%d get %d\n", id, i)
time.Sleep(time.Millisecond)
}
}
}
wg.Add(6)
go send(0)
go send(1)
go send(2)
go recv(0)
go recv(1)
go recv(2)
time.Sleep(time.Second)
// signal finish
close(done)
// wait all sender and receiver exit
wg.Wait()
}
利用channel实现线程同步互斥信号量
利用channel实现线程互斥
package main
import (
"fmt"
"sync"
)
var value int = 0
func add(ch chan struct{}, group *sync.WaitGroup) {
ch <- struct{}{}
value++
<-ch
group.Done()
}
func main() {
var wg sync.WaitGroup
var channel = make(chan struct{}, 1)
wg.Add(10000)
for i := 0; i < 10000; i++ {
go add(channel, &wg)
}
wg.Wait()
fmt.Println(value)
}
利用channel实现线程同步
package main
import (
"fmt"
"sync"
)
func funcA(ch chan struct{},group *sync.WaitGroup) {
fmt.Println("A函数执行完毕")
ch<- struct{}{}
group.Done()
}
func funcB(ch chan struct{},group *sync.WaitGroup) {
<-ch
fmt.Println("B函数执行完毕")
group.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
ch:=make(chan struct{})
go funcA(ch,&wg)
go funcB(ch,&wg)
wg.Wait()
}
利用channel实现信号量
package main
import (
"fmt"
"sync"
)
func funcA(ch chan struct{},group *sync.WaitGroup) {
<-ch
fmt.Println("A函数执行完毕")
ch<- struct{}{}
group.Done()
}
func funcB(ch chan struct{},group *sync.WaitGroup) {
<-ch
fmt.Println("B函数执行完毕")
ch<- struct{}{}
group.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
ch:=make(chan struct{},1)
ch<- struct{}{}
go funcA(ch,&wg)
go funcB(ch,&wg)
wg.Wait()
}
总结
channle 作为 golang 最重要的特性,用起来还是比较爽的。传统的 C 里要实现类型的功能的话,一般需要用到 socket 或者 FIFO 来实现,另外还要考虑数据包的完整性与并发冲突的问题,channel 则屏蔽了这些底层细节,使用者只需要考虑读写就可以了。channel 是引用类型,了解一下 channel 底层的机制对更好的使用 channel 还是很用必要的。虽然操作原语简单,但涉及到阻塞的问题,使用不当可能会造成死锁或者无限制的协程创建最终导致进程挂掉。channel 除在可以用来在协程之间通信外,其阻塞和唤醒协程的特性也可以用作协程之间的同步机制,文中也用示例简单介绍了这种场景下的用法。关闭 channel 并不是必须的,只要没有协程没用引用 channel ,最终会被 GC 清理。所以使用的时候要特别注意,不要让协程阻塞在 channel 上,这种情况很难检测到,而且会造成 channel 和阻塞在 channel 的协程占有的资源无法被 GC 清理最终导致内存泄露。channle 方便 golang 程序使用 CSP 的编程范形,但是 golang 是一种多范形的编程语言,golang 也支持传统的通过共享内存来通信的编程方式。终极的原则是根据场景选择合适的编程范型,不要因为 channel 好用而滥用 CSP 。