Goの非同期処理で気をつけると楽になるかもしれないこと
この記事は Gopher道場 Advent Calendar 2018 17日の記事です。
はじめに
第4回Gopher道場の ゴルーチンとチャネル のテーマの際にメンターを務めました。
いくつか質問を受けたり、課題をレビューする中で channel
goroutine
の扱いが慣れないために、動作が不安定だったり、
エラーの処理が難しくなってしまっているコードが散見されました。
Goに初めて触る人にとって channel
goroutine
はやはり難しいということだと感じましたので、
この記事では、Goに慣れていない人向けに、Goで非同期処理を書くときに気をつけることを紹介します。
少しでもGoで非同期処理を書く時に参考になればと思います。
非同期処理を記述するときに気をつけると嬉しいかもしれないこと
気をつけたほうが良い点は、シンプルさを保つために意識していることですが、もちろん常にそうすべきというわけではなく、 必要のない非同期で苦しまないために助けになる場合がある要素です。
- 非同期にしない
channel
をパッケージの外に渡さないcontext.Context
でちゃんとブロックしている箇所をキャンセル可能にする
非同期にしない
まず本当に非同期として記述する必要があるのか考え直してみましょう。
ライブラリとして提供する場合には、非同期になっていることで使いにくいことがあります。
ライブラリ利用者が使いやすい単位でメソッドを切り出して、利用者が同期 or 非同期を選択できるようにしておくと嬉しいです。
goroutineの数の調整なども利用者側ができたほうがよいでしょう。
// 処理対象の作成と処理を同じメソッドで実行しているので、利用者は処理を分けにくい func Run(n int) []int { target := make([]int, n) for i := 0; i < n; i++ { target[i] = i } var wg sync.WaitGroup wg.Add(len(target)) res := make([]int, len(target)) for i, v := range target { go func(i, v int){ defer wg.Done() time.Sleep(time.Second) res[i] = v * 3 }(i, v) } wg.Wait() return res }
非同期については利用者側に任せてしまうと下記のような実装になります。
type Data struct { Value int } // Eval が分けられる // これを `go d.Eval()` で呼び出して利用する func (d Data) Eval() int { time.Sleep(time.Second) return d.Value * 3 } // GenTargets は軽い処理 func GenTargets(n int) []*Data { targets := make([]*Data, n) for i := 0; i < n; i++ { targets[i] = &Data{Value:i} } return targets }
channel
をパッケージの外に渡さない
channel
はcloseであったり、バッファーの大きさであったり、扱いが難しい部分があります。
パッケージ内で作成した channel
をパッケージの外から触れるようにしてしまうと、
考えないといけない状況が増えて辛いときもあるため、 channel
はパッケージの中に閉じ込めると楽になるときがあります。
func Run() chan<- int { ch := make(chan int, 10) go func() { for i := range ch { fmt.Println("num:", i) } }() return ch } // mainがchanにデータ入れたりして頑張る func main() { ch := Run() ch <- 1 ch <- 2 time.Sleep(time.Second) }
下記は channel
を閉じ込めた実装例です。
type Runner struct { ch chan int } func NewRunner(buf int) *Runner { return &Runner{ ch: make(chan int, buf), } } func (r *Runner) Add(i int) { r.ch <- i } func (r *Runner) Run() { for i := range r.ch { fmt.Println("num:", i) } } // mainはchanを意識しない操作しかできない func main() { runner := NewRunner(10) go runner.Run() runner.Add(1) runner.Add(2) time.Sleep(time.Second) }
readのパターンに対してはcallbackとして渡してあげる実装も選択肢としてあります。
func Run() <-chan int{ ch := make(chan int, 10) go func() { var count int for { time.Sleep(100 * time.Millisecond) count++ ch <- count } }() return ch } // mainがchanを扱っている func main() { ch := Run() for i := range ch { fmt.Println("num:", i) } }
callbackで記述すると下記のようになります。
func Run(cb func(i int)) { ch := make(chan int, 10) go func() { var count int for { time.Sleep(100 * time.Millisecond) count++ ch <- count } }() for i := range ch { cb(i) } } // mainはchanを意識しない func main() { go Run(func(i int) { fmt.Println("num:", i) }) time.Sleep(time.Second) }
context.Context
でちゃんとブロックしている箇所をキャンセル可能にする
context.Context
は実行している処理をキャンセルすることができますが、それは適切に Context.Done()
をハンドルした場合です。
これもやはり channel
に慣れていないと、 Context.Done()
を待つべき処理が良くない箇所に書いてしまうことがあるようなので、
気をつけたほうが良い点です。
例えば、 channelをパッケージの外に渡さない
であげた例で context.Context
を使用した良くない例を上げてみます。
type Runner struct { ch chan int } func NewRunner(buf int) *Runner { return &Runner{ ch: make(chan int, buf), } } func (r *Runner) Add(i int) { // バッファーが満杯であり、ブロックしますが、途中でやめられない r.ch <- i } func (r *Runner) Run(ctx context.Context) { for i := range r.ch { select { case <-ctx.Done(): // r.chからの取り出しの単位でしかキャンセルができない fmt.Println("end") return default: } fmt.Println("num:", i) } } func main() { // このmainの処理において、Runでtimeoutは拾われることはほぼありません("end"という文字列は出力されない) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() runner := NewRunner(10) go runner.Run(ctx) runner.Add(ctx, 1) runner.Add(ctx, 2) time.Sleep(2 * time.Second) }
channel
は書き込みと読み込みでブロックする可能性があります。
そのブロックする箇所で select
でちゃんと受け取れるようにしましょう。
type Runner struct { ch chan int } func NewRunner(buf int) *Runner { return &Runner{ ch: make(chan int, buf), } } func (r *Runner) Add(ctx context.Context, i int) error { select { case r.ch <- i: case <-ctx.Done(): return ctx.Err() } } func (r *Runner) Run(ctx context.Context) { for { select { case i := <-r.ch: fmt.Println("num:", i) case <-ctx.Done(): // r.chに何も書き込まれることがなくても、キャンセルで即座に終了する fmt.Println("end") return } } } func main() { // ちゃんとこのtimeoutで終わるようになる("end"が出力される) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() runner := NewRunner(10) go runner.Run(ctx) _ = runner.Add(ctx, 1) _ = runner.Add(ctx, 2) time.Sleep(2 * time.Second) }
これは元々 channel
を処理している例なので、わざわざ最初のような書き方をすることはないのかもしれないですが、
まだ context
パッケージがなかった頃のメソッドや、インターフェースを満たすために context.Context
を受け取らないが、
ブロックする処理などを for n.Next() {...}
などでブロックしつつloopするときには、それを待つための channel
を作成して、
キャンセルがキャンセルしたいと思った時にできるようになったほうがよいでしょう。
まとめ
Goの channel
goroutine
を利用して、非同期処理を書いていくための気をつけることを書いてみました。
すべてのパターンに対してよい実装というわけではありませんが、Goを書くときの参考にしていただければと思います。