Goの非同期処理で気をつけると楽になるかもしれないこと

この記事は Gopher道場 Advent Calendar 2018 17日の記事です。

はじめに

第4回Gopher道場の ゴルーチンとチャネル のテーマの際にメンターを務めました。
いくつか質問を受けたり、課題をレビューする中で channel goroutine の扱いが慣れないために、動作が不安定だったり、 エラーの処理が難しくなってしまっているコードが散見されました。

Goに初めて触る人にとって channel goroutine はやはり難しいということだと感じましたので、 この記事では、Goに慣れていない人向けに、Goで非同期処理を書くときに気をつけることを紹介します。
少しでもGoで非同期処理を書く時に参考になればと思います。

非同期処理を記述するときに気をつけると嬉しいかもしれないこと

気をつけたほうが良い点は、シンプルさを保つために意識していることですが、もちろん常にそうすべきというわけではなく、 必要のない非同期で苦しまないために助けになる場合がある要素です。

  • 非同期にしない
  • channel をパッケージの外に渡さない
  • context.Context でちゃんとブロックしている箇所をキャンセル可能にする

非同期にしない

まず本当に非同期として記述する必要があるのか考え直してみましょう。

ライブラリとして提供する場合には、非同期になっていることで使いにくいことがあります。
ライブラリ利用者が使いやすい単位でメソッドを切り出して、利用者が同期 or 非同期を選択できるようにしておくと嬉しいです。
goroutineの数の調整なども利用者側ができたほうがよいでしょう。

// 処理対象の作成と処理を同じメソッドで実行しているので、利用者は処理を分けにくい
func Run(n int) []int {
    target := make([]int, n)
    for i := 0; i < n; i++ {
        target[i] = i
    }

    var wg sync.WaitGroup
    wg.Add(len(target))
    res := make([]int, len(target))
    for i, v := range target {
        go func(i, v int){
            defer wg.Done()
            time.Sleep(time.Second)
            res[i] = v * 3
        }(i, v)
    }
    wg.Wait()

    return res
}

非同期については利用者側に任せてしまうと下記のような実装になります。

type Data struct {
    Value int
}

// Eval が分けられる
// これを `go d.Eval()` で呼び出して利用する
func (d Data) Eval() int {
    time.Sleep(time.Second)
    return d.Value * 3
}

// GenTargets は軽い処理
func GenTargets(n int) []*Data {
    targets := make([]*Data, n)
    for i := 0; i < n; i++ {
        targets[i] = &Data{Value:i}
    }

    return targets
}

channel をパッケージの外に渡さない

channel はcloseであったり、バッファーの大きさであったり、扱いが難しい部分があります。
パッケージ内で作成した channel をパッケージの外から触れるようにしてしまうと、 考えないといけない状況が増えて辛いときもあるため、 channel はパッケージの中に閉じ込めると楽になるときがあります。

func Run() chan<- int {
    ch := make(chan int, 10)
    go func() {
        for i := range ch {
            fmt.Println("num:", i)
        }
    }()

    return ch
}

// mainがchanにデータ入れたりして頑張る
func main() {
    ch := Run()
    ch <- 1
    ch <- 2
    time.Sleep(time.Second)
}

下記は channel を閉じ込めた実装例です。

type Runner struct {
    ch chan int
}

func NewRunner(buf int) *Runner {
    return &Runner{
        ch: make(chan int, buf),
    }
}

func (r *Runner) Add(i int) {
    r.ch <- i
}

func (r *Runner) Run() {
    for i := range r.ch {
        fmt.Println("num:", i)
    }
}

// mainはchanを意識しない操作しかできない
func main() {
    runner := NewRunner(10)
    go runner.Run()
    runner.Add(1)
    runner.Add(2)

    time.Sleep(time.Second)
}

readのパターンに対してはcallbackとして渡してあげる実装も選択肢としてあります。

func Run() <-chan int{
    ch := make(chan int, 10)

    go func() {
        var count int
        for {
            time.Sleep(100 * time.Millisecond)
            count++
            ch <- count
        }
    }()

    return ch
}

// mainがchanを扱っている
func main() {
    ch := Run()
    for i := range ch {
        fmt.Println("num:", i)
    }
}

callbackで記述すると下記のようになります。

func Run(cb func(i int)) {
    ch := make(chan int, 10)

    go func() {
        var count int
        for {
            time.Sleep(100 * time.Millisecond)
            count++
            ch <- count
        }
    }()

    for i := range ch {
        cb(i)
    }
}

// mainはchanを意識しない
func main() {
    go Run(func(i int) {
        fmt.Println("num:", i)
    })

    time.Sleep(time.Second)
}

context.Context でちゃんとブロックしている箇所をキャンセル可能にする

context.Context は実行している処理をキャンセルすることができますが、それは適切に Context.Done() をハンドルした場合です。
これもやはり channel に慣れていないと、 Context.Done() を待つべき処理が良くない箇所に書いてしまうことがあるようなので、 気をつけたほうが良い点です。

例えば、 channelをパッケージの外に渡さない であげた例で context.Context を使用した良くない例を上げてみます。

type Runner struct {
    ch chan int
}

func NewRunner(buf int) *Runner {
    return &Runner{
        ch: make(chan int, buf),
    }
}

func (r *Runner) Add(i int) {
    // バッファーが満杯であり、ブロックしますが、途中でやめられない
    r.ch <- i
}

func (r *Runner) Run(ctx context.Context) {
    for i := range r.ch {
        select {
        case <-ctx.Done():  // r.chからの取り出しの単位でしかキャンセルができない
            fmt.Println("end")
            return
        default:
        }
     
        fmt.Println("num:", i)
    }
}

func main() {
    // このmainの処理において、Runでtimeoutは拾われることはほぼありません("end"という文字列は出力されない)
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    runner := NewRunner(10)
    go runner.Run(ctx)
    runner.Add(ctx, 1)
    runner.Add(ctx, 2)

    time.Sleep(2 * time.Second)
}

channel は書き込みと読み込みでブロックする可能性があります。
そのブロックする箇所で select でちゃんと受け取れるようにしましょう。

type Runner struct {
    ch chan int
}

func NewRunner(buf int) *Runner {
    return &Runner{
        ch: make(chan int, buf),
    }
}

func (r *Runner) Add(ctx context.Context, i int) error {
    select {
    case r.ch <- i:
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (r *Runner) Run(ctx context.Context) {
    for  {
        select {
        case i := <-r.ch:
            fmt.Println("num:", i)
        case <-ctx.Done():  // r.chに何も書き込まれることがなくても、キャンセルで即座に終了する
            fmt.Println("end")
            return
        }
    }
}

func main() {
    // ちゃんとこのtimeoutで終わるようになる("end"が出力される)
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    runner := NewRunner(10)
    go runner.Run(ctx)
    _ = runner.Add(ctx, 1)
    _ = runner.Add(ctx, 2)

    time.Sleep(2 * time.Second)
}

これは元々 channel を処理している例なので、わざわざ最初のような書き方をすることはないのかもしれないですが、 まだ context パッケージがなかった頃のメソッドや、インターフェースを満たすために context.Context を受け取らないが、 ブロックする処理などを for n.Next() {...} などでブロックしつつloopするときには、それを待つための channel を作成して、 キャンセルがキャンセルしたいと思った時にできるようになったほうがよいでしょう。

まとめ

Goの channel goroutine を利用して、非同期処理を書いていくための気をつけることを書いてみました。 すべてのパターンに対してよい実装というわけではありませんが、Goを書くときの参考にしていただければと思います。