如何優雅的使用Goroutine下-如何管控Goroutine的生命週期

透過之前的這篇Goroutine怎麼洩漏文章,我們知道Goroutine是怎麼洩漏的,接下來我們一起看下怎麼管控Goroutine的生命週期。

只有我們把Groutine的生命週期管理起來,才能避免Goroutine洩漏。那怎麼管控Goroutine的生命週期,我們做到以下幾點就可以了。

在啟動goroutine的時候你要問題自己3個問題

儘量把併發扔給呼叫者

(因為只有呼叫者才知道一個Goroutine什麼開始什麼時候結束)

Goroutine什麼時候結束

(讓呼叫者知道Goroutine什麼時候結束)

怎麼控制Goroutine結束

(讓呼叫者知道怎麼控制Goroutine結束)

示例1:

開啟了2個Goroutine,判斷了錯誤還記錄了日誌,看起來習慣挺好的。但是有點問題,如果http。ListenAndServe報錯,主Goroutine一直堵塞導致無法退出

package mainimport ( “fmt” “log” “net/http” _ “net/http/pprof”)func main() { serverMux := http。NewServeMux() serverMux。HandleFunc(“/hello”, func(rw http。ResponseWriter, r *http。Request) { fmt。Fprintln(rw, “hello word”) }) // 訪問地址:http://localhost/hello go func() { err := http。ListenAndServe(“:80”, serverMux) if err != nil { log。Println(err) } }() // 訪問地址:http://localhost:8001/debug/pprof/ go func() { err := http。ListenAndServe(“:8001”, http。DefaultServeMux) if err != nil { log。Println(err) } }() // 阻塞 select {}}

修改版本1:有些小夥伴很聰明,不就是無法退出嘛,把log。Println()改成log。Fatal()。如果還有Goroutine執行其他邏輯,你無法保證程式的完整性。不僅導致defer無法執行,還會導致程式無法平滑退出。

package mainimport ( “fmt” “log” “net/http” _ “net/http/pprof”)func main() { serverMux := http。NewServeMux() serverMux。HandleFunc(“/hello”, func(rw http。ResponseWriter, r *http。Request) { fmt。Fprintln(rw, “hello word”) }) // 訪問地址:http://localhost/hello go func() { defer func() { log。Println(“http app”) }() err := http。ListenAndServe(“:80”, serverMux) if err != nil { log。Fatal(err) } }() // 訪問地址:http://localhost:8001/debug/pprof/ go func() { defer func() { log。Println(“http debug”) }() err := http。ListenAndServe(“:8001”, http。DefaultServeMux) if err != nil { log。Fatal(err) } }() // 阻塞 select {}}

修改版本2:

第1步:如果一個方法幹了2多件事,我們一般會拆分成多個方法。這裡我們拆分成 httpApp()和httpDebug()

第2步:我們要控制Goroutine的生命週期,把併發扔給呼叫者,Goroutine什麼時候結束、怎麼結束

package mainimport ( “context” “fmt” “log” “net/http” _ “net/http/pprof” “sync”)var wg = sync。WaitGroup{}var stop chan errorvar closeChanShutdownServer chan intfunc main() { stop = make(chan error, 2) closeChanShutdownServer = make(chan int, 2) // 併發扔給呼叫者 wg。Add(2) go func() { wg。Done() stop <- httpApp(closeChanShutdownServer) }() go func() { wg。Done() stop <- httpDebug(closeChanShutdownServer) }() shutdownClose := false for i := 0; i < cap(stop); i++ { err := <-stop log。Println(“main: ”, err) if !shutdownClose { shutdownClose = true close(closeChanShutdownServer) } } wg。Wait() log。Println(“main。goroutine done”)}// 訪問地址:http://localhost/hellofunc httpApp(closeChanShutdown <-chan int) error { defer func() { log。Println(“http app”) }() serverMux := http。NewServeMux() serverMux。HandleFunc(“/hello”, func(rw http。ResponseWriter, r *http。Request) { fmt。Fprintln(rw, “hello word”) }) return httpAppRun(“:80”, serverMux, closeChanShutdown)}// 訪問地址:http://localhost:8001/debug/pprof/func httpDebug(closeChanShutdown <-chan int) error { defer func() { log。Println(“http debug”) }() return httpAppRun(“:8001”, http。DefaultServeMux, closeChanShutdown)}// 執行http// 什麼時候goroutine結束:關閉通道 closeChanShutdown 後結束// 怎麼控制goroutine結束:關閉透過 closeChanShutdown 控制goroutine結束func httpAppRun(addr string, handler http。Handler, closeChanShutdown <-chan int) error { wg。Add(1) s := http。Server{ Addr: addr, Handler: handler, } go func() { log。Println(addr, “goroutine shutdown waiting”) <-closeChanShutdown log。Println(addr, “goroutine shutdown start”) s。Shutdown(context。Background()) log。Println(addr, “goroutine shutdown done”) wg。Done() }() err := s。ListenAndServe() if err != nil { log。Println(err) } return err}

示例2:

定義了一個 search 函式,使用 200 毫秒超時模擬 http、rpc請求

package mainimport ( “fmt” “time”)// 定義了一個 search 函式,使用 200 毫秒超時模擬 http、rpc請求func search(term string) (string, error) { time。Sleep(200 * time。Millisecond) return “some value”, nil}// process 方法 呼叫 search 方法找到記錄並列印// 它有什麼問題呢,它會一直堵塞在這裡,對於一些應用延遲是不可接受的並且可能是呼叫多個rpcfunc process(term string) error { record, err := search(term) if err != nil { return err } fmt。Println(“Received:”, record) return nil}func main() { process(“test”)}

修改版本1:process 方法 呼叫 search 方法找到記錄並列印,如果超過 100 毫秒還沒拿到結果就返回一個錯誤,看起來習慣挺好的, 看起來也沒什麼問題

search 函式 200毫秒超時,process 函式 100 秒超時,ch 通道是無緩衝的導致,必須等待通道接受者接收到了值,這裡沒有接收者,導致傳送方無限堵塞等待。因為接收者 100ms 超時以及關閉到,導致 goroutine 洩漏。

有的人又說 這裡可以設定有緩衝通道,我設定一個 1 不就好了。其實你是不知道search函式什麼時候退,所以我們要做好超時控制。

package mainimport ( “context” “errors” “fmt” “runtime” “time”)// 定義了一個 search 函式,使用 200 毫秒超時模擬 http、rpc請求func search(term string) (string, error) { time。Sleep(200 * time。Millisecond) return “some value”, nil}type result struct { record string err error}// process 方法 呼叫 search 方法找到記錄並列印,如果超過 100 毫秒還麼拿到結果就返回一個錯誤// 看起來習慣挺好的, 它有什麼問題呢// search 函式 200毫秒超時,process 函式 100 秒超時,ch 通道是無緩衝的導致,必須等待通道接受者接收到了值,// 這裡沒有接收者,因為接收者 100ms 超時以及關閉到,導致 goroutine 洩漏// 有的人又說 這裡可以設定有緩衝通道,我設定一個 1 不就好了。// 其實你是不知道search函式什麼時候退,所以我們要做好超時控制func process(term string) error { // 建立一個100毫秒取消的 context ctx, cancel := context。WithTimeout(context。Background(), 100*time。Millisecond) defer cancel() // 建立一個通道接受 search 的返回值 ch := make(chan result) //ch := make(chan result, 1) // 開啟一個 goroutine 搜尋記錄 // 用 result 把結果包起來,傳送到 通道中 // 這裡有個好的地方,把併發扔給呼叫者 go func() { fmt。Println(“call search before”) record, err := search(term) fmt。Println(“call search after”) ch <- result{record, err} fmt。Println(“call search before sen chan”) }() // 這裡會堵塞等待 // 要嘛 100毫秒超時,返回一個錯誤 // 要嘛 從通道接受到值 select { case <-ctx。Done(): return errors。New(“search canceled”) case result := <-ch: if result。err != nil { return result。err } fmt。Println(“Received:”, result。record) return nil }}func main() { process(“test”) // 這裡測試為了簡單,請不要模仿 for { runtime。Gosched() }}

也就是說當我們不知道一個Goroutine什麼時候結束時,我們不應該啟動它。

只要做好這3點(

把併發扔給呼叫者、Goroutine什麼時候結束

怎麼控制Goroutine結束

),無論超時退出或者用channel告訴Goroutine什麼時候退出,go關鍵字你就用好。